This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bf2ca54fe0 [HUDI-5269] Enhancing spark-sql write tests for some of the
core user flows (#7230)
bf2ca54fe0 is described below
commit bf2ca54fe0c765388b06dd65990b3b53ab60e7a3
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Nov 29 08:47:29 2022 -0500
[HUDI-5269] Enhancing spark-sql write tests for some of the core user flows
(#7230)
Add good test coverage for some of the core user flows w/ spark data source
writes.
---
.../common/testutils/HoodieTestDataGenerator.java | 181 +++++++--
.../hudi/functional/TestSparkSqlCoreFlow.scala | 441 +++++++++++++++++++++
2 files changed, 580 insertions(+), 42 deletions(-)
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index e1aa0c7a01..5cedea0634 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -126,6 +126,9 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
public static final String TRIP_FLATTENED_SCHEMA =
TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+ public static final String TRIP_NESTED_EXAMPLE_SCHEMA =
+ TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+
public static final String TRIP_SCHEMA =
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":["
+
"{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+
"{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":
\"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
@@ -139,6 +142,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
public static final Schema AVRO_SCHEMA = new
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
+ public static final Schema NESTED_AVRO_SCHEMA = new
Schema.Parser().parse(TRIP_NESTED_EXAMPLE_SCHEMA);
public static final TypeDescription ORC_SCHEMA =
AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA));
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
@@ -159,13 +163,21 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
this(seed, DEFAULT_PARTITION_PATHS, new HashMap<>());
}
+ public HoodieTestDataGenerator(String schema, long seed) {
+ this(schema, seed, DEFAULT_PARTITION_PATHS, new HashMap<>());
+ }
+
public HoodieTestDataGenerator(long seed, String[] partitionPaths,
Map<Integer, KeyPartition> keyPartitionMap) {
+ this(TRIP_EXAMPLE_SCHEMA, seed, partitionPaths, keyPartitionMap);
+ }
+
+ public HoodieTestDataGenerator(String schema, long seed, String[]
partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) {
this.rand = new Random(seed);
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
this.existingKeysBySchema = new HashMap<>();
- this.existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap);
+ this.existingKeysBySchema.put(schema, keyPartitionMap);
this.numKeysBySchema = new HashMap<>();
- this.numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size());
+ this.numKeysBySchema.put(schema, keyPartitionMap.size());
logger.info(String.format("Test DataGenerator's seed (%s)", seed));
}
@@ -223,6 +235,8 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return generatePayloadForTripSchema(key, commitTime);
} else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
return generatePayloadForShortTripSchema(key, commitTime);
+ } else if (TRIP_NESTED_EXAMPLE_SCHEMA.equals(schemaStr)) {
+ return generateNestedExampleRandomValue(key, commitTime);
}
return null;
@@ -255,6 +269,11 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return generateRandomValue(key, instantTime, isFlattened, 0);
}
+ private RawTripTestPayload generateNestedExampleRandomValue(
+ HoodieKey key, String instantTime) throws IOException {
+ return generateNestedExampleRandomValue(key, instantTime, 0);
+ }
+
private RawTripTestPayload generateRandomValue(
HoodieKey key, String instantTime, boolean isFlattened, int ts) throws
IOException {
GenericRecord rec = generateGenericRecord(
@@ -263,6 +282,14 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return new RawTripTestPayload(rec.toString(), key.getRecordKey(),
key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}
+ private RawTripTestPayload generateNestedExampleRandomValue(
+ HoodieKey key, String instantTime, int ts) throws IOException {
+ GenericRecord rec = generateNestedExampleGenericRecord(
+ key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime,
"driver-" + instantTime, ts,
+ false);
+ return new RawTripTestPayload(rec.toString(), key.getRecordKey(),
key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
+ }
+
/**
* Generates a new avro record with TRIP_SCHEMA, retaining the key if
optionally provided.
*/
@@ -298,10 +325,11 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return generateGenericRecord(rowKey, partitionPath, riderName, driverName,
timestamp, false, false);
}
- public GenericRecord generateGenericRecord(String rowKey, String
partitionPath, String riderName, String driverName,
- long timestamp, boolean
isDeleteRecord,
- boolean isFlattened) {
- GenericRecord rec = new GenericData.Record(isFlattened ?
FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
+
+ /**
+ * Populate rec with values for TRIP_SCHEMA_PREFIX
+ */
+ private void generateTripPrefixValues(GenericRecord rec, String rowKey,
String partitionPath, String riderName, String driverName, long timestamp) {
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
rec.put("partition_path", partitionPath);
@@ -311,47 +339,108 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
rec.put("begin_lon", rand.nextDouble());
rec.put("end_lat", rand.nextDouble());
rec.put("end_lon", rand.nextDouble());
- if (isFlattened) {
- rec.put("fare", rand.nextDouble() * 100);
- rec.put("currency", "USD");
- } else {
- rec.put("distance_in_meters", rand.nextInt());
- rec.put("seconds_since_epoch", rand.nextLong());
- rec.put("weight", rand.nextFloat());
- byte[] bytes = "Canada".getBytes();
- rec.put("nation", ByteBuffer.wrap(bytes));
- long randomMillis = genRandomTimeMillis(rand);
- Instant instant = Instant.ofEpochMilli(randomMillis);
- rec.put("current_date", (int) LocalDateTime.ofInstant(instant,
ZoneOffset.UTC).toLocalDate().toEpochDay());
- rec.put("current_ts", randomMillis);
-
- BigDecimal bigDecimal = new BigDecimal(String.format("%5f",
rand.nextFloat()));
- Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
- Conversions.DecimalConversion decimalConversions = new
Conversions.DecimalConversion();
- GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal,
decimalSchema, LogicalTypes.decimal(10, 6));
- rec.put("height", genericFixed);
-
- rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
-
- GenericRecord fareRecord = new
GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
- fareRecord.put("amount", rand.nextDouble() * 100);
- fareRecord.put("currency", "USD");
- rec.put("fare", fareRecord);
-
- GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1,
AVRO_SCHEMA.getField("tip_history").schema());
- Schema tipSchema = new
Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
- GenericRecord tipRecord = new GenericData.Record(tipSchema);
- tipRecord.put("amount", rand.nextDouble() * 100);
- tipRecord.put("currency", "USD");
- tipHistoryArray.add(tipRecord);
- rec.put("tip_history", tipHistoryArray);
- }
+ }
+
+ /**
+ * Populate rec with values for FARE_FLATTENED_SCHEMA
+ */
+ private void generateFareFlattenedValues(GenericRecord rec) {
+ rec.put("fare", rand.nextDouble() * 100);
+ rec.put("currency", "USD");
+ }
+
+ /**
+ * Populate rec with values for EXTRA_TYPE_SCHEMA
+ */
+ private void generateExtraSchemaValues(GenericRecord rec) {
+ rec.put("distance_in_meters", rand.nextInt());
+ rec.put("seconds_since_epoch", rand.nextLong());
+ rec.put("weight", rand.nextFloat());
+ byte[] bytes = "Canada".getBytes();
+ rec.put("nation", ByteBuffer.wrap(bytes));
+ long randomMillis = genRandomTimeMillis(rand);
+ Instant instant = Instant.ofEpochMilli(randomMillis);
+ rec.put("current_date", (int) LocalDateTime.ofInstant(instant,
ZoneOffset.UTC).toLocalDate().toEpochDay());
+ rec.put("current_ts", randomMillis);
+
+ BigDecimal bigDecimal = new BigDecimal(String.format("%5f",
rand.nextFloat()));
+ Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
+ Conversions.DecimalConversion decimalConversions = new
Conversions.DecimalConversion();
+ GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal,
decimalSchema, LogicalTypes.decimal(10, 6));
+ rec.put("height", genericFixed);
+ }
+
+ /**
+ * Populate rec with values for MAP_TYPE_SCHEMA
+ */
+ private void generateMapTypeValues(GenericRecord rec) {
+ rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
+ }
+ /**
+ * Populate rec with values for FARE_NESTED_SCHEMA
+ */
+ private void generateFareNestedValues(GenericRecord rec) {
+ GenericRecord fareRecord = new
GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
+ fareRecord.put("amount", rand.nextDouble() * 100);
+ fareRecord.put("currency", "USD");
+ rec.put("fare", fareRecord);
+ }
+
+ /**
+ * Populate rec with values for TIP_NESTED_SCHEMA
+ */
+ private void generateTipNestedValues(GenericRecord rec) {
+ GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1,
AVRO_SCHEMA.getField("tip_history").schema());
+ Schema tipSchema = new
Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
+ GenericRecord tipRecord = new GenericData.Record(tipSchema);
+ tipRecord.put("amount", rand.nextDouble() * 100);
+ tipRecord.put("currency", "USD");
+ tipHistoryArray.add(tipRecord);
+ rec.put("tip_history", tipHistoryArray);
+ }
+
+ /**
+ * Populate rec with values for TRIP_SCHEMA_SUFFIX
+ */
+ private void generateTripSuffixValues(GenericRecord rec, boolean
isDeleteRecord) {
if (isDeleteRecord) {
rec.put("_hoodie_is_deleted", true);
} else {
rec.put("_hoodie_is_deleted", false);
}
+ }
+
+
+ /**
+ * Generate record conforming to TRIP_EXAMPLE_SCHEMA or
TRIP_FLATTENED_SCHEMA if isFlattened is true
+ */
+ public GenericRecord generateGenericRecord(String rowKey, String
partitionPath, String riderName, String driverName,
+ long timestamp, boolean
isDeleteRecord,
+ boolean isFlattened) {
+ GenericRecord rec = new GenericData.Record(isFlattened ?
FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
+ generateTripPrefixValues(rec, rowKey, partitionPath, riderName,
driverName, timestamp);
+ if (isFlattened) {
+ generateFareFlattenedValues(rec);
+ } else {
+ generateExtraSchemaValues(rec);
+ generateMapTypeValues(rec);
+ generateFareNestedValues(rec);
+ generateTipNestedValues(rec);
+ }
+ generateTripSuffixValues(rec, isDeleteRecord);
+ return rec;
+ }
+
+ /**
+ * Generate record conforming to TRIP_NESTED_EXAMPLE_SCHEMA
+ */
+ public GenericRecord generateNestedExampleGenericRecord(String rowKey,
String partitionPath, String riderName, String driverName,
+ long timestamp,
boolean isDeleteRecord) {
+ GenericRecord rec = new GenericData.Record(NESTED_AVRO_SCHEMA);
+ generateTripPrefixValues(rec, rowKey, partitionPath, riderName,
driverName, timestamp);
+ generateFareNestedValues(rec);
+ generateTripSuffixValues(rec, isDeleteRecord);
return rec;
}
@@ -474,13 +563,17 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
}
/**
- * Generates new inserts with nested schema, uniformly across the partition
paths above.
+ * Generates new inserts for TRIP_EXAMPLE_SCHEMA with nested schema,
uniformly across the partition paths above.
* It also updates the list of existing keys.
*/
public List<HoodieRecord> generateInserts(String instantTime, Integer n) {
return generateInserts(instantTime, n, false);
}
+ public List<HoodieRecord> generateInsertsNestedExample(String instantTime,
Integer n) {
+ return generateInsertsStream(instantTime, n, false,
TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList());
+ }
+
/**
* Generates new inserts, uniformly across the partition paths above.
* It also updates the list of existing keys.
@@ -721,6 +814,10 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return generateUniqueUpdatesStream(instantTime, n,
TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
}
+ public List<HoodieRecord> generateUniqueUpdatesNestedExample(String
instantTime, Integer n) {
+ return generateUniqueUpdatesStream(instantTime, n,
TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList());
+ }
+
public List<HoodieRecord> generateUniqueUpdatesAsPerSchema(String
instantTime, Integer n, String schemaStr) {
return generateUniqueUpdatesStream(instantTime, n,
schemaStr).collect(Collectors.toList());
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
new file mode 100644
index 0000000000..e1b888aca2
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers,
HoodieSparkUtils}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.spark.sql
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.scalatest.Inspectors.forAll
+
+import java.io.File
+import scala.collection.JavaConversions._
+
+
+class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
+ val colsToCompare = "timestamp, _row_key, partition_path, rider, driver,
begin_lat, begin_lon, end_lat, end_lon, fare.amount, fare.currency,
_hoodie_is_deleted"
+
+ //params for core flow tests
+ val params: List[String] = List(
+
"COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM"
+ )
+
+ //extracts the params and runs each core flow test
+ forAll (params) { (paramStr: String) =>
+ test(s"Core flow with params: $paramStr") {
+ val splits = paramStr.split('|')
+ withTempDir { basePath =>
+ testCoreFlows(basePath,
+ tableType = splits(0),
+ isMetadataEnabledOnWrite = splits(1).toBoolean,
+ isMetadataEnabledOnRead = splits(2).toBoolean,
+ keyGenClass = splits(3),
+ indexType = splits(4))
+ }
+ }
+ }
+
+ def testCoreFlows(basePath: File, tableType: String,
isMetadataEnabledOnWrite: Boolean, isMetadataEnabledOnRead: Boolean,
keyGenClass: String, indexType: String): Unit = {
+ //Create table and set up for testing
+ val tableName = generateTableName
+ val tableBasePath = basePath.getCanonicalPath + "/" + tableName
+ val writeOptions = getWriteOptions(tableName, tableType,
isMetadataEnabledOnWrite, keyGenClass, indexType)
+ createTable(tableName, keyGenClass, writeOptions, tableBasePath)
+ val fs = FSUtils.getFs(tableBasePath,
spark.sparkContext.hadoopConfiguration)
+ val dataGen = new
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA,
0xDEED)
+
+ //Bulk insert first set of records
+ val inputDf0 = generateInserts(dataGen, "000", 100)
+ inputDf0.cache()
+ insertInto(tableName, inputDf0, "bulk_insert", isMetadataEnabledOnWrite,
keyGenClass)
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, tableBasePath, "000"))
+ //Verify bulk insert works correctly
+ val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+ snapshotDf1.cache()
+ assertEquals(100, snapshotDf1.count())
+ compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
+
+ //Test updated records
+ val updateDf = generateUniqueUpdates(dataGen, "001", 50)
+ insertInto(tableName, updateDf, "upsert", isMetadataEnabledOnWrite,
keyGenClass)
+ val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs,
tableBasePath)
+ val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+ snapshotDf2.cache()
+ assertEquals(100, snapshotDf2.count())
+ compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
+
+ val inputDf2 = generateUniqueUpdates(dataGen, "002", 60)
+ val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
+ insertInto(tableName, inputDf2, "upsert", isMetadataEnabledOnWrite,
keyGenClass)
+ val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs,
tableBasePath)
+ assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs,
tableBasePath, "000").size())
+
+ val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+ snapshotDf3.cache()
+ assertEquals(100, snapshotDf3.count())
+ compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3)
+
+ // Read Incremental Query, need to use spark-ds because functionality does
not exist for spark sql
+ // we have 2 commits, try pulling the first commit (which is not the
latest)
+ //HUDI-5266
+ val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs,
tableBasePath, "000").get(0)
+ val hoodieIncViewDf1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit)
+ .load(tableBasePath)
+ //val hoodieIncViewDf1 = doIncRead(tableName, isMetadataEnabledOnRead,
"000", firstCommit)
+ assertEquals(100, hoodieIncViewDf1.count()) // 100 initial inserts must be
pulled
+ var countsPerCommit =
hoodieIncViewDf1.groupBy("_hoodie_commit_time").count().collect()
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(firstCommit, countsPerCommit(0).get(0).toString)
+
+ val inputDf3 = generateUniqueUpdates(dataGen, "003", 80)
+ insertInto(tableName, inputDf3, "upsert", isMetadataEnabledOnWrite,
keyGenClass)
+
+ //another incremental query with commit2 and commit3
+ //HUDI-5266
+ val hoodieIncViewDf2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2)
+ .option(DataSourceReadOptions.END_INSTANTTIME.key(), commitInstantTime3)
+ .load(tableBasePath)
+
+ assertEquals(uniqueKeyCnt2, hoodieIncViewDf2.count()) // 60 records must
be pulled
+ countsPerCommit =
hoodieIncViewDf2.groupBy("_hoodie_commit_time").count().collect()
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime3, countsPerCommit(0).get(0).toString)
+
+
+ val timeTravelDf = if (HoodieSparkUtils.gteqSpark3_2_1) {
+ spark.sql(s"select * from $tableName timestamp as of
'$commitInstantTime2'")
+ } else {
+ //HUDI-5265
+ spark.read.format("org.apache.hudi")
+ .option("as.of.instant", commitInstantTime2)
+ .load(tableBasePath)
+ }
+ assertEquals(100, timeTravelDf.count())
+ compareEntireInputDfWithHudiDf(snapshotDf2, timeTravelDf)
+
+ if (tableType.equals("MERGE_ON_READ")) {
+ val readOptDf = doMORReadOptimizedQuery(isMetadataEnabledOnRead,
tableBasePath)
+ compareEntireInputDfWithHudiDf(inputDf0, readOptDf)
+
+ val snapshotDf4 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+ snapshotDf4.cache()
+
+ // trigger compaction and try out Read optimized query.
+ val inputDf4 = generateUniqueUpdates(dataGen, "004", 40)
+ doInlineCompact(tableName, inputDf4, "upsert", isMetadataEnabledOnWrite,
"3", keyGenClass)
+ val snapshotDf5 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+ snapshotDf5.cache()
+ compareUpdateDfWithHudiDf(inputDf4, snapshotDf5, snapshotDf4)
+
+ // compaction is expected to have completed. both RO and RT are expected
to return same results.
+ compareROAndRT(isMetadataEnabledOnRead, tableName, tableBasePath)
+ }
+ }
+
+ def doSnapshotRead(tableName: String, isMetadataEnabledOnRead: Boolean):
sql.DataFrame = {
+ spark.sql("set hoodie.datasource.query.type=\"snapshot\"")
+ spark.sql(s"set
hoodie.metadata.enable=${String.valueOf(isMetadataEnabledOnRead)}")
+ spark.sql(s"select * from $tableName")
+ }
+
+ def doInlineCompact(tableName: String, recDf: sql.DataFrame, writeOp:
String, isMetadataEnabledOnWrite: Boolean, numDeltaCommits: String,
keyGenClass: String): Unit = {
+ spark.sql("set hoodie.compact.inline=true")
+ spark.sql(s"set hoodie.compact.inline.max.delta.commits=$numDeltaCommits")
+ insertInto(tableName, recDf, writeOp, isMetadataEnabledOnWrite,
keyGenClass)
+ spark.sql("set hoodie.compact.inline=false")
+ }
+
+ def getWriteOptions(tableName: String, tableType: String,
isMetadataEnabledOnWrite: Boolean,
+ keyGenClass: String, indexType: String): String = {
+ val typeString = if (tableType.equals("COPY_ON_WRITE")) {
+ "cow"
+ } else if (tableType.equals("MERGE_ON_READ")) {
+ "mor"
+ } else {
+ tableType
+ }
+
+ s"""
+ |tblproperties (
+ | type = '$typeString',
+ | primaryKey = '_row_key',
+ | preCombineField = 'timestamp',
+ | hoodie.bulkinsert.shuffle.parallelism = 4,
+ | hoodie.database.name = "databaseName",
+ | hoodie.table.keygenerator.class = '$keyGenClass',
+ | hoodie.delete.shuffle.parallelism = 2,
+ | hoodie.index.type = "$indexType",
+ | hoodie.insert.shuffle.parallelism = 4,
+ | hoodie.metadata.enable = ${String.valueOf(isMetadataEnabledOnWrite)},
+ | hoodie.table.name = "$tableName",
+ | hoodie.upsert.shuffle.parallelism = 4
+ | )""".stripMargin
+ }
+
+ def insertInto(tableName: String, inputDf: sql.DataFrame, writeOp: String,
isMetadataEnabledOnWrite: Boolean, keyGenClass: String): Unit = {
+ inputDf.select("timestamp", "_row_key", "rider", "driver", "begin_lat",
"begin_lon", "end_lat", "end_lon", "fare",
+ "_hoodie_is_deleted",
"partition_path").createOrReplaceTempView("insert_temp_table")
+ spark.sql(s"set
hoodie.metadata.enable=${String.valueOf(isMetadataEnabledOnWrite)}")
+ spark.sql(s"set hoodie.datasource.write.keygenerator.class=$keyGenClass")
+ if (writeOp.equals("upsert")) {
+ spark.sql(s"set hoodie.datasource.write.operation=$writeOp")
+ spark.sql("set hoodie.sql.bulk.insert.enable=false")
+ spark.sql("set hoodie.sql.insert.mode=upsert")
+ spark.sql(
+ s"""
+ | merge into $tableName as target
+ | using insert_temp_table as source
+ | on target._row_key = source._row_key and
+ | target.partition_path = source.partition_path
+ | when matched then update set *
+ | when not matched then insert *
+ | """.stripMargin)
+ } else if (writeOp.equals("bulk_insert")) {
+ //If HUDI-5257 is resolved, write operation should be bulk_insert, and
this function can be more compact due to
+ //less repeated code
+ spark.sql("set hoodie.datasource.write.operation=insert")
+ spark.sql("set hoodie.sql.bulk.insert.enable=true")
+ spark.sql("set hoodie.sql.insert.mode=non-strict")
+ spark.sql(s"insert into $tableName select * from insert_temp_table")
+ } else if (writeOp.equals("insert")) {
+ spark.sql(s"set hoodie.datasource.write.operation=$writeOp")
+ spark.sql("set hoodie.sql.bulk.insert.enable=false")
+ spark.sql("set hoodie.sql.insert.mode=non-strict")
+ spark.sql(s"insert into $tableName select * from insert_temp_table")
+ }
+ }
+ def createTable(tableName: String, keyGenClass: String, writeOptions:
String, tableBasePath: String): Unit = {
+ //If you have partitioned by (partition_path) with nonpartitioned keygen,
the partition_path will be empty in the table
+ val partitionedBy = if
(!keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")) {
+ "partitioned by (partition_path)"
+ } else {
+ ""
+ }
+
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | timestamp long,
+ | _row_key string,
+ | rider string,
+ | driver string,
+ | begin_lat double,
+ | begin_lon double,
+ | end_lat double,
+ | end_lon double,
+ | fare STRUCT<
+ | amount: double,
+ | currency: string >,
+ | _hoodie_is_deleted boolean,
+ | partition_path string
+ |) using hudi
+ | $partitionedBy
+ | $writeOptions
+ | location '$tableBasePath'
+ |
+ """.stripMargin)
+ }
+
+ def generateInserts(dataGen: HoodieTestDataGenerator, instantTime: String,
n: Int): sql.DataFrame = {
+ val recs = dataGen.generateInsertsNestedExample(instantTime, n)
+ spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs), 2))
+ }
+
+ def generateUniqueUpdates(dataGen: HoodieTestDataGenerator, instantTime:
String, n: Int): sql.DataFrame = {
+ val recs = dataGen.generateUniqueUpdatesNestedExample(instantTime, n)
+ spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs), 2))
+ }
+
+ def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row],
beforeDf: Dataset[Row]): Unit = {
+ dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
+ inputDf.createOrReplaceTempView("inputTbl")
+ beforeDf.createOrReplaceTempView("beforeTbl")
+ val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
+ val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
+ val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from beforeTbl")
+
+ assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count,
inputDfToCompare.count)
+
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
0)
+ }
+
+ def compareEntireInputDfWithHudiDf(inputDf: Dataset[Row], hudiDf:
Dataset[Row]): Unit = {
+ dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
+ inputDf.createOrReplaceTempView("inputTbl")
+ val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
+ val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
+
+ assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count,
inputDfToCompare.count)
+ assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
+ }
+
+ def doMORReadOptimizedQuery(isMetadataEnabledOnRead: Boolean, basePath:
String): sql.DataFrame = {
+ spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead)
+ .load(basePath)
+ }
+
+ def compareROAndRT(isMetadataEnabledOnRead: Boolean, tableName: String,
basePath: String): Unit = {
+ val roDf = doMORReadOptimizedQuery(isMetadataEnabledOnRead, basePath)
+ val rtDf = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+ dropMetaColumns(roDf).createOrReplaceTempView("hudiTbl1")
+ dropMetaColumns(rtDf).createOrReplaceTempView("hudiTbl2")
+
+ val hudiDf1ToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl1")
+ val hudiDf2ToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl2")
+
+ assertEquals(hudiDf1ToCompare.intersect(hudiDf2ToCompare).count,
hudiDf1ToCompare.count)
+ assertEquals(hudiDf1ToCompare.except(hudiDf2ToCompare).count, 0)
+ }
+
+ def dropMetaColumns(inputDf: sql.DataFrame): sql.DataFrame = {
+ inputDf.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+ HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ HoodieRecord.FILENAME_METADATA_FIELD)
+ }
+
+ //params for immutable user flow
+ val paramsForImmutable: List[String] = List(
+
"COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM"
+ )
+
+ //extracts the params and runs each immutable user flow test
+ forAll(paramsForImmutable) { (paramStr: String) =>
+ test(s"Immutable user flow with params: $paramStr") {
+ val splits = paramStr.split('|')
+ withTempDir { basePath =>
+ testImmutableUserFlow(basePath,
+ tableType = splits(0),
+ operation = splits(1),
+ isMetadataEnabledOnWrite = splits(2).toBoolean,
+ isMetadataEnabledOnRead = splits(3).toBoolean,
+ keyGenClass = splits(4),
+ indexType = splits(5))
+ }
+ }
+ }
+
+ def testImmutableUserFlow(basePath: File, tableType: String, operation:
String, isMetadataEnabledOnWrite: Boolean,
+ isMetadataEnabledOnRead: Boolean, keyGenClass:
String, indexType: String): Unit = {
+ val tableName = generateTableName
+ val tableBasePath = basePath.getCanonicalPath + "/" + tableName
+ val writeOptions = getWriteOptions(tableName, tableType,
isMetadataEnabledOnWrite, keyGenClass, indexType)
+ createTable(tableName, keyGenClass, writeOptions, tableBasePath)
+ val fs = FSUtils.getFs(tableBasePath,
spark.sparkContext.hadoopConfiguration)
+
+ //Insert Operation
+ val dataGen = new
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA,
0xDEED)
+ val inputDf0 = generateInserts(dataGen, "000", 100)
+ insertInto(tableName, inputDf0, "bulk_insert", isMetadataEnabledOnWrite,
keyGenClass)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, tableBasePath, "000"))
+
+ //Snapshot query
+ val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+ snapshotDf1.cache()
+ assertEquals(100, snapshotDf1.count())
+ compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
+
+ val inputDf1 = generateInserts(dataGen, "001", 50)
+ insertInto(tableName, inputDf1, operation, isMetadataEnabledOnWrite,
keyGenClass)
+
+ val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+ snapshotDf2.cache()
+ assertEquals(150, snapshotDf2.count())
+
+ compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0), snapshotDf2)
+
+ val inputDf2 = generateInserts(dataGen, "002", 60)
+ inputDf2.cache()
+ insertInto(tableName, inputDf2, operation, isMetadataEnabledOnWrite,
keyGenClass)
+
+ assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs,
tableBasePath, "000").size())
+
+ // Snapshot Query
+ val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabledOnRead)
+ snapshotDf3.cache()
+ assertEquals(210, snapshotDf3.count())
+ compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0).union(inputDf2),
snapshotDf3)
+ }
+
+}