This is an automated email from the ASF dual-hosted git repository. satish pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9913de4ebfa2da09aa5352d814c3078e4b493441 Author: Jon Vexler <[email protected]> AuthorDate: Tue Nov 29 08:47:29 2022 -0500 [HUDI-5269] Enhancing spark-sql write tests for some of the core user flows (#7230) Add good test coverage for some of the core user flows w/ spark data source writes. --- .../common/testutils/HoodieTestDataGenerator.java | 181 +++++++-- .../hudi/functional/TestSparkSqlCoreFlow.scala | 441 +++++++++++++++++++++ 2 files changed, 580 insertions(+), 42 deletions(-) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index e1aa0c7a01d..5cedea06342 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -126,6 +126,9 @@ public class HoodieTestDataGenerator implements AutoCloseable { public static final String TRIP_FLATTENED_SCHEMA = TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX; + public static final String TRIP_NESTED_EXAMPLE_SCHEMA = + TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; + public static final String TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[" + "{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"}," + "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"; @@ -139,6 +142,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); + public static final Schema NESTED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_NESTED_EXAMPLE_SCHEMA); public static final TypeDescription ORC_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA)); public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS = HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA); @@ -159,13 +163,21 @@ public class HoodieTestDataGenerator implements AutoCloseable { this(seed, DEFAULT_PARTITION_PATHS, new HashMap<>()); } + public HoodieTestDataGenerator(String schema, long seed) { + this(schema, seed, DEFAULT_PARTITION_PATHS, new HashMap<>()); + } + public HoodieTestDataGenerator(long seed, String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) { + this(TRIP_EXAMPLE_SCHEMA, seed, partitionPaths, keyPartitionMap); + } + + public HoodieTestDataGenerator(String schema, long seed, String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) { this.rand = new Random(seed); this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); this.existingKeysBySchema = new HashMap<>(); - this.existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap); + this.existingKeysBySchema.put(schema, keyPartitionMap); this.numKeysBySchema = new HashMap<>(); - this.numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size()); + this.numKeysBySchema.put(schema, keyPartitionMap.size()); logger.info(String.format("Test DataGenerator's seed (%s)", seed)); } @@ -223,6 +235,8 @@ public class HoodieTestDataGenerator implements AutoCloseable { return generatePayloadForTripSchema(key, commitTime); } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) { return generatePayloadForShortTripSchema(key, commitTime); + } else if (TRIP_NESTED_EXAMPLE_SCHEMA.equals(schemaStr)) { + return generateNestedExampleRandomValue(key, commitTime); } return null; @@ -255,6 +269,11 @@ public class HoodieTestDataGenerator implements AutoCloseable { return generateRandomValue(key, instantTime, isFlattened, 0); } + private RawTripTestPayload generateNestedExampleRandomValue( + HoodieKey key, String instantTime) throws IOException { + return generateNestedExampleRandomValue(key, instantTime, 0); + } + private RawTripTestPayload generateRandomValue( HoodieKey key, String instantTime, boolean isFlattened, int ts) throws IOException { GenericRecord rec = generateGenericRecord( @@ -263,6 +282,14 @@ public class HoodieTestDataGenerator implements AutoCloseable { return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); } + private RawTripTestPayload generateNestedExampleRandomValue( + HoodieKey key, String instantTime, int ts) throws IOException { + GenericRecord rec = generateNestedExampleGenericRecord( + key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, ts, + false); + return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); + } + /** * Generates a new avro record with TRIP_SCHEMA, retaining the key if optionally provided. */ @@ -298,10 +325,11 @@ public class HoodieTestDataGenerator implements AutoCloseable { return generateGenericRecord(rowKey, partitionPath, riderName, driverName, timestamp, false, false); } - public GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName, - long timestamp, boolean isDeleteRecord, - boolean isFlattened) { - GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA); + + /** + * Populate rec with values for TRIP_SCHEMA_PREFIX + */ + private void generateTripPrefixValues(GenericRecord rec, String rowKey, String partitionPath, String riderName, String driverName, long timestamp) { rec.put("_row_key", rowKey); rec.put("timestamp", timestamp); rec.put("partition_path", partitionPath); @@ -311,47 +339,108 @@ public class HoodieTestDataGenerator implements AutoCloseable { rec.put("begin_lon", rand.nextDouble()); rec.put("end_lat", rand.nextDouble()); rec.put("end_lon", rand.nextDouble()); - if (isFlattened) { - rec.put("fare", rand.nextDouble() * 100); - rec.put("currency", "USD"); - } else { - rec.put("distance_in_meters", rand.nextInt()); - rec.put("seconds_since_epoch", rand.nextLong()); - rec.put("weight", rand.nextFloat()); - byte[] bytes = "Canada".getBytes(); - rec.put("nation", ByteBuffer.wrap(bytes)); - long randomMillis = genRandomTimeMillis(rand); - Instant instant = Instant.ofEpochMilli(randomMillis); - rec.put("current_date", (int) LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate().toEpochDay()); - rec.put("current_ts", randomMillis); - - BigDecimal bigDecimal = new BigDecimal(String.format("%5f", rand.nextFloat())); - Schema decimalSchema = AVRO_SCHEMA.getField("height").schema(); - Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion(); - GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, decimalSchema, LogicalTypes.decimal(10, 6)); - rec.put("height", genericFixed); - - rec.put("city_to_state", Collections.singletonMap("LA", "CA")); - - GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema()); - fareRecord.put("amount", rand.nextDouble() * 100); - fareRecord.put("currency", "USD"); - rec.put("fare", fareRecord); - - GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, AVRO_SCHEMA.getField("tip_history").schema()); - Schema tipSchema = new Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType(); - GenericRecord tipRecord = new GenericData.Record(tipSchema); - tipRecord.put("amount", rand.nextDouble() * 100); - tipRecord.put("currency", "USD"); - tipHistoryArray.add(tipRecord); - rec.put("tip_history", tipHistoryArray); - } + } + + /** + * Populate rec with values for FARE_FLATTENED_SCHEMA + */ + private void generateFareFlattenedValues(GenericRecord rec) { + rec.put("fare", rand.nextDouble() * 100); + rec.put("currency", "USD"); + } + + /** + * Populate rec with values for EXTRA_TYPE_SCHEMA + */ + private void generateExtraSchemaValues(GenericRecord rec) { + rec.put("distance_in_meters", rand.nextInt()); + rec.put("seconds_since_epoch", rand.nextLong()); + rec.put("weight", rand.nextFloat()); + byte[] bytes = "Canada".getBytes(); + rec.put("nation", ByteBuffer.wrap(bytes)); + long randomMillis = genRandomTimeMillis(rand); + Instant instant = Instant.ofEpochMilli(randomMillis); + rec.put("current_date", (int) LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate().toEpochDay()); + rec.put("current_ts", randomMillis); + + BigDecimal bigDecimal = new BigDecimal(String.format("%5f", rand.nextFloat())); + Schema decimalSchema = AVRO_SCHEMA.getField("height").schema(); + Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion(); + GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, decimalSchema, LogicalTypes.decimal(10, 6)); + rec.put("height", genericFixed); + } + + /** + * Populate rec with values for MAP_TYPE_SCHEMA + */ + private void generateMapTypeValues(GenericRecord rec) { + rec.put("city_to_state", Collections.singletonMap("LA", "CA")); + } + /** + * Populate rec with values for FARE_NESTED_SCHEMA + */ + private void generateFareNestedValues(GenericRecord rec) { + GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema()); + fareRecord.put("amount", rand.nextDouble() * 100); + fareRecord.put("currency", "USD"); + rec.put("fare", fareRecord); + } + + /** + * Populate rec with values for TIP_NESTED_SCHEMA + */ + private void generateTipNestedValues(GenericRecord rec) { + GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, AVRO_SCHEMA.getField("tip_history").schema()); + Schema tipSchema = new Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType(); + GenericRecord tipRecord = new GenericData.Record(tipSchema); + tipRecord.put("amount", rand.nextDouble() * 100); + tipRecord.put("currency", "USD"); + tipHistoryArray.add(tipRecord); + rec.put("tip_history", tipHistoryArray); + } + + /** + * Populate rec with values for TRIP_SCHEMA_SUFFIX + */ + private void generateTripSuffixValues(GenericRecord rec, boolean isDeleteRecord) { if (isDeleteRecord) { rec.put("_hoodie_is_deleted", true); } else { rec.put("_hoodie_is_deleted", false); } + } + + + /** + * Generate record conforming to TRIP_EXAMPLE_SCHEMA or TRIP_FLATTENED_SCHEMA if isFlattened is true + */ + public GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName, + long timestamp, boolean isDeleteRecord, + boolean isFlattened) { + GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA); + generateTripPrefixValues(rec, rowKey, partitionPath, riderName, driverName, timestamp); + if (isFlattened) { + generateFareFlattenedValues(rec); + } else { + generateExtraSchemaValues(rec); + generateMapTypeValues(rec); + generateFareNestedValues(rec); + generateTipNestedValues(rec); + } + generateTripSuffixValues(rec, isDeleteRecord); + return rec; + } + + /** + * Generate record conforming to TRIP_NESTED_EXAMPLE_SCHEMA + */ + public GenericRecord generateNestedExampleGenericRecord(String rowKey, String partitionPath, String riderName, String driverName, + long timestamp, boolean isDeleteRecord) { + GenericRecord rec = new GenericData.Record(NESTED_AVRO_SCHEMA); + generateTripPrefixValues(rec, rowKey, partitionPath, riderName, driverName, timestamp); + generateFareNestedValues(rec); + generateTripSuffixValues(rec, isDeleteRecord); return rec; } @@ -474,13 +563,17 @@ public class HoodieTestDataGenerator implements AutoCloseable { } /** - * Generates new inserts with nested schema, uniformly across the partition paths above. + * Generates new inserts for TRIP_EXAMPLE_SCHEMA with nested schema, uniformly across the partition paths above. * It also updates the list of existing keys. */ public List<HoodieRecord> generateInserts(String instantTime, Integer n) { return generateInserts(instantTime, n, false); } + public List<HoodieRecord> generateInsertsNestedExample(String instantTime, Integer n) { + return generateInsertsStream(instantTime, n, false, TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList()); + } + /** * Generates new inserts, uniformly across the partition paths above. * It also updates the list of existing keys. @@ -721,6 +814,10 @@ public class HoodieTestDataGenerator implements AutoCloseable { return generateUniqueUpdatesStream(instantTime, n, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList()); } + public List<HoodieRecord> generateUniqueUpdatesNestedExample(String instantTime, Integer n) { + return generateUniqueUpdatesStream(instantTime, n, TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList()); + } + public List<HoodieRecord> generateUniqueUpdatesAsPerSchema(String instantTime, Integer n, String schemaStr) { return generateUniqueUpdatesStream(instantTime, n, schemaStr).collect(Collectors.toList()); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala new file mode 100644 index 00000000000..e1b888aca20 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, HoodieSparkUtils} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.spark.sql +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.scalatest.Inspectors.forAll + +import java.io.File +import scala.collection.JavaConversions._ + + +class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { + val colsToCompare = "timestamp, _row_key, partition_path, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare.amount, fare.currency, _hoodie_is_deleted" + + //params for core flow tests + val params: List[String] = List( + "COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM" + ) + + //extracts the params and runs each core flow test + forAll (params) { (paramStr: String) => + test(s"Core flow with params: $paramStr") { + val splits = paramStr.split('|') + withTempDir { basePath => + testCoreFlows(basePath, + tableType = splits(0), + isMetadataEnabledOnWrite = splits(1).toBoolean, + isMetadataEnabledOnRead = splits(2).toBoolean, + keyGenClass = splits(3), + indexType = splits(4)) + } + } + } + + def testCoreFlows(basePath: File, tableType: String, isMetadataEnabledOnWrite: Boolean, isMetadataEnabledOnRead: Boolean, keyGenClass: String, indexType: String): Unit = { + //Create table and set up for testing + val tableName = generateTableName + val tableBasePath = basePath.getCanonicalPath + "/" + tableName + val writeOptions = getWriteOptions(tableName, tableType, isMetadataEnabledOnWrite, keyGenClass, indexType) + createTable(tableName, keyGenClass, writeOptions, tableBasePath) + val fs = FSUtils.getFs(tableBasePath, spark.sparkContext.hadoopConfiguration) + val dataGen = new HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 0xDEED) + + //Bulk insert first set of records + val inputDf0 = generateInserts(dataGen, "000", 100) + inputDf0.cache() + insertInto(tableName, inputDf0, "bulk_insert", isMetadataEnabledOnWrite, keyGenClass) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, tableBasePath, "000")) + //Verify bulk insert works correctly + val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + snapshotDf1.cache() + assertEquals(100, snapshotDf1.count()) + compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1) + + //Test updated records + val updateDf = generateUniqueUpdates(dataGen, "001", 50) + insertInto(tableName, updateDf, "upsert", isMetadataEnabledOnWrite, keyGenClass) + val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tableBasePath) + val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + snapshotDf2.cache() + assertEquals(100, snapshotDf2.count()) + compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1) + + val inputDf2 = generateUniqueUpdates(dataGen, "002", 60) + val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count() + insertInto(tableName, inputDf2, "upsert", isMetadataEnabledOnWrite, keyGenClass) + val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, tableBasePath) + assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, tableBasePath, "000").size()) + + val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + snapshotDf3.cache() + assertEquals(100, snapshotDf3.count()) + compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3) + + // Read Incremental Query, need to use spark-ds because functionality does not exist for spark sql + // we have 2 commits, try pulling the first commit (which is not the latest) + //HUDI-5266 + val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, tableBasePath, "000").get(0) + val hoodieIncViewDf1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000") + .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit) + .load(tableBasePath) + //val hoodieIncViewDf1 = doIncRead(tableName, isMetadataEnabledOnRead, "000", firstCommit) + assertEquals(100, hoodieIncViewDf1.count()) // 100 initial inserts must be pulled + var countsPerCommit = hoodieIncViewDf1.groupBy("_hoodie_commit_time").count().collect() + assertEquals(1, countsPerCommit.length) + assertEquals(firstCommit, countsPerCommit(0).get(0).toString) + + val inputDf3 = generateUniqueUpdates(dataGen, "003", 80) + insertInto(tableName, inputDf3, "upsert", isMetadataEnabledOnWrite, keyGenClass) + + //another incremental query with commit2 and commit3 + //HUDI-5266 + val hoodieIncViewDf2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2) + .option(DataSourceReadOptions.END_INSTANTTIME.key(), commitInstantTime3) + .load(tableBasePath) + + assertEquals(uniqueKeyCnt2, hoodieIncViewDf2.count()) // 60 records must be pulled + countsPerCommit = hoodieIncViewDf2.groupBy("_hoodie_commit_time").count().collect() + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime3, countsPerCommit(0).get(0).toString) + + + val timeTravelDf = if (HoodieSparkUtils.gteqSpark3_2_1) { + spark.sql(s"select * from $tableName timestamp as of '$commitInstantTime2'") + } else { + //HUDI-5265 + spark.read.format("org.apache.hudi") + .option("as.of.instant", commitInstantTime2) + .load(tableBasePath) + } + assertEquals(100, timeTravelDf.count()) + compareEntireInputDfWithHudiDf(snapshotDf2, timeTravelDf) + + if (tableType.equals("MERGE_ON_READ")) { + val readOptDf = doMORReadOptimizedQuery(isMetadataEnabledOnRead, tableBasePath) + compareEntireInputDfWithHudiDf(inputDf0, readOptDf) + + val snapshotDf4 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + snapshotDf4.cache() + + // trigger compaction and try out Read optimized query. + val inputDf4 = generateUniqueUpdates(dataGen, "004", 40) + doInlineCompact(tableName, inputDf4, "upsert", isMetadataEnabledOnWrite, "3", keyGenClass) + val snapshotDf5 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + snapshotDf5.cache() + compareUpdateDfWithHudiDf(inputDf4, snapshotDf5, snapshotDf4) + + // compaction is expected to have completed. both RO and RT are expected to return same results. + compareROAndRT(isMetadataEnabledOnRead, tableName, tableBasePath) + } + } + + def doSnapshotRead(tableName: String, isMetadataEnabledOnRead: Boolean): sql.DataFrame = { + spark.sql("set hoodie.datasource.query.type=\"snapshot\"") + spark.sql(s"set hoodie.metadata.enable=${String.valueOf(isMetadataEnabledOnRead)}") + spark.sql(s"select * from $tableName") + } + + def doInlineCompact(tableName: String, recDf: sql.DataFrame, writeOp: String, isMetadataEnabledOnWrite: Boolean, numDeltaCommits: String, keyGenClass: String): Unit = { + spark.sql("set hoodie.compact.inline=true") + spark.sql(s"set hoodie.compact.inline.max.delta.commits=$numDeltaCommits") + insertInto(tableName, recDf, writeOp, isMetadataEnabledOnWrite, keyGenClass) + spark.sql("set hoodie.compact.inline=false") + } + + def getWriteOptions(tableName: String, tableType: String, isMetadataEnabledOnWrite: Boolean, + keyGenClass: String, indexType: String): String = { + val typeString = if (tableType.equals("COPY_ON_WRITE")) { + "cow" + } else if (tableType.equals("MERGE_ON_READ")) { + "mor" + } else { + tableType + } + + s""" + |tblproperties ( + | type = '$typeString', + | primaryKey = '_row_key', + | preCombineField = 'timestamp', + | hoodie.bulkinsert.shuffle.parallelism = 4, + | hoodie.database.name = "databaseName", + | hoodie.table.keygenerator.class = '$keyGenClass', + | hoodie.delete.shuffle.parallelism = 2, + | hoodie.index.type = "$indexType", + | hoodie.insert.shuffle.parallelism = 4, + | hoodie.metadata.enable = ${String.valueOf(isMetadataEnabledOnWrite)}, + | hoodie.table.name = "$tableName", + | hoodie.upsert.shuffle.parallelism = 4 + | )""".stripMargin + } + + def insertInto(tableName: String, inputDf: sql.DataFrame, writeOp: String, isMetadataEnabledOnWrite: Boolean, keyGenClass: String): Unit = { + inputDf.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", + "_hoodie_is_deleted", "partition_path").createOrReplaceTempView("insert_temp_table") + spark.sql(s"set hoodie.metadata.enable=${String.valueOf(isMetadataEnabledOnWrite)}") + spark.sql(s"set hoodie.datasource.write.keygenerator.class=$keyGenClass") + if (writeOp.equals("upsert")) { + spark.sql(s"set hoodie.datasource.write.operation=$writeOp") + spark.sql("set hoodie.sql.bulk.insert.enable=false") + spark.sql("set hoodie.sql.insert.mode=upsert") + spark.sql( + s""" + | merge into $tableName as target + | using insert_temp_table as source + | on target._row_key = source._row_key and + | target.partition_path = source.partition_path + | when matched then update set * + | when not matched then insert * + | """.stripMargin) + } else if (writeOp.equals("bulk_insert")) { + //If HUDI-5257 is resolved, write operation should be bulk_insert, and this function can be more compact due to + //less repeated code + spark.sql("set hoodie.datasource.write.operation=insert") + spark.sql("set hoodie.sql.bulk.insert.enable=true") + spark.sql("set hoodie.sql.insert.mode=non-strict") + spark.sql(s"insert into $tableName select * from insert_temp_table") + } else if (writeOp.equals("insert")) { + spark.sql(s"set hoodie.datasource.write.operation=$writeOp") + spark.sql("set hoodie.sql.bulk.insert.enable=false") + spark.sql("set hoodie.sql.insert.mode=non-strict") + spark.sql(s"insert into $tableName select * from insert_temp_table") + } + } + def createTable(tableName: String, keyGenClass: String, writeOptions: String, tableBasePath: String): Unit = { + //If you have partitioned by (partition_path) with nonpartitioned keygen, the partition_path will be empty in the table + val partitionedBy = if (!keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")) { + "partitioned by (partition_path)" + } else { + "" + } + + spark.sql( + s""" + | create table $tableName ( + | timestamp long, + | _row_key string, + | rider string, + | driver string, + | begin_lat double, + | begin_lon double, + | end_lat double, + | end_lon double, + | fare STRUCT< + | amount: double, + | currency: string >, + | _hoodie_is_deleted boolean, + | partition_path string + |) using hudi + | $partitionedBy + | $writeOptions + | location '$tableBasePath' + | + """.stripMargin) + } + + def generateInserts(dataGen: HoodieTestDataGenerator, instantTime: String, n: Int): sql.DataFrame = { + val recs = dataGen.generateInsertsNestedExample(instantTime, n) + spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs), 2)) + } + + def generateUniqueUpdates(dataGen: HoodieTestDataGenerator, instantTime: String, n: Int): sql.DataFrame = { + val recs = dataGen.generateUniqueUpdatesNestedExample(instantTime, n) + spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs), 2)) + } + + def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], beforeDf: Dataset[Row]): Unit = { + dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl") + inputDf.createOrReplaceTempView("inputTbl") + beforeDf.createOrReplaceTempView("beforeTbl") + val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from hudiTbl") + val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from inputTbl") + val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from beforeTbl") + + assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count, inputDfToCompare.count) + assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count, 0) + } + + def compareEntireInputDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row]): Unit = { + dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl") + inputDf.createOrReplaceTempView("inputTbl") + val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from hudiTbl") + val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " from inputTbl") + + assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count, inputDfToCompare.count) + assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0) + } + + def doMORReadOptimizedQuery(isMetadataEnabledOnRead: Boolean, basePath: String): sql.DataFrame = { + spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead) + .load(basePath) + } + + def compareROAndRT(isMetadataEnabledOnRead: Boolean, tableName: String, basePath: String): Unit = { + val roDf = doMORReadOptimizedQuery(isMetadataEnabledOnRead, basePath) + val rtDf = doSnapshotRead(tableName, isMetadataEnabledOnRead) + dropMetaColumns(roDf).createOrReplaceTempView("hudiTbl1") + dropMetaColumns(rtDf).createOrReplaceTempView("hudiTbl2") + + val hudiDf1ToCompare = spark.sqlContext.sql("select " + colsToCompare + " from hudiTbl1") + val hudiDf2ToCompare = spark.sqlContext.sql("select " + colsToCompare + " from hudiTbl2") + + assertEquals(hudiDf1ToCompare.intersect(hudiDf2ToCompare).count, hudiDf1ToCompare.count) + assertEquals(hudiDf1ToCompare.except(hudiDf2ToCompare).count, 0) + } + + def dropMetaColumns(inputDf: sql.DataFrame): sql.DataFrame = { + inputDf.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, + HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, + HoodieRecord.FILENAME_METADATA_FIELD) + } + + //params for immutable user flow + val paramsForImmutable: List[String] = List( + "COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM" + ) + + //extracts the params and runs each immutable user flow test + forAll(paramsForImmutable) { (paramStr: String) => + test(s"Immutable user flow with params: $paramStr") { + val splits = paramStr.split('|') + withTempDir { basePath => + testImmutableUserFlow(basePath, + tableType = splits(0), + operation = splits(1), + isMetadataEnabledOnWrite = splits(2).toBoolean, + isMetadataEnabledOnRead = splits(3).toBoolean, + keyGenClass = splits(4), + indexType = splits(5)) + } + } + } + + def testImmutableUserFlow(basePath: File, tableType: String, operation: String, isMetadataEnabledOnWrite: Boolean, + isMetadataEnabledOnRead: Boolean, keyGenClass: String, indexType: String): Unit = { + val tableName = generateTableName + val tableBasePath = basePath.getCanonicalPath + "/" + tableName + val writeOptions = getWriteOptions(tableName, tableType, isMetadataEnabledOnWrite, keyGenClass, indexType) + createTable(tableName, keyGenClass, writeOptions, tableBasePath) + val fs = FSUtils.getFs(tableBasePath, spark.sparkContext.hadoopConfiguration) + + //Insert Operation + val dataGen = new HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 0xDEED) + val inputDf0 = generateInserts(dataGen, "000", 100) + insertInto(tableName, inputDf0, "bulk_insert", isMetadataEnabledOnWrite, keyGenClass) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, tableBasePath, "000")) + + //Snapshot query + val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + snapshotDf1.cache() + assertEquals(100, snapshotDf1.count()) + compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1) + + val inputDf1 = generateInserts(dataGen, "001", 50) + insertInto(tableName, inputDf1, operation, isMetadataEnabledOnWrite, keyGenClass) + + val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + snapshotDf2.cache() + assertEquals(150, snapshotDf2.count()) + + compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0), snapshotDf2) + + val inputDf2 = generateInserts(dataGen, "002", 60) + inputDf2.cache() + insertInto(tableName, inputDf2, operation, isMetadataEnabledOnWrite, keyGenClass) + + assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, tableBasePath, "000").size()) + + // Snapshot Query + val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabledOnRead) + snapshotDf3.cache() + assertEquals(210, snapshotDf3.count()) + compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0).union(inputDf2), snapshotDf3) + } + +}
