This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch partition-bucket-index in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 39d4991166caf0e40b2b1fa32e5f9cb775dd7ad6 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Mon Mar 17 20:23:27 2025 -0700 [HUDI-9170] Fixing schema projection with file group reader (#12970) Co-authored-by: Y Ethan Guo <[email protected]> --- .../read/HoodieFileGroupReaderSchemaHandler.java | 7 +- .../read/TestHoodieFileGroupRecordBuffer.java | 81 +++++++++++++ .../TestHoodieDatasetBulkInsertHelper.java | 2 +- .../apache/hudi/testutils/DataSourceTestUtils.java | 33 +++-- .../src/test/resources/exampleEvolvedSchema.txt | 5 + .../src/test/resources/exampleSchema.txt | 5 + .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +- .../apache/hudi/functional/TestCOWDataSource.scala | 41 +------ .../apache/hudi/functional/TestMORDataSource.scala | 46 ------- .../hudi/functional/TestSparkDataSource.scala | 135 ++++++++++++++++++++- 10 files changed, 257 insertions(+), 100 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java index 714d43784be..f007f9ef400 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java @@ -150,7 +150,8 @@ public class HoodieFileGroupReaderSchemaHandler<T> { return AvroInternalSchemaConverter.pruneAvroSchemaToInternalSchema(requiredSchema, internalSchema); } - private Schema generateRequiredSchema() { + @VisibleForTesting + Schema generateRequiredSchema() { //might need to change this if other queries than mor have mandatory fields if (!needsMORMerge) { return requestedSchema; @@ -204,6 +205,10 @@ public class HoodieFileGroupReaderSchemaHandler<T> { requiredFields.add(preCombine); } } + + if (dataSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) != null) { + requiredFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD); + } return requiredFields.toArray(new String[0]); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java index 65683bd889a..ca3a09d96df 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java @@ -19,10 +19,25 @@ package org.apache.hudi.common.table.read; +import org.apache.hudi.common.config.RecordMergeMode; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE; import static org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.getOrderingValue; @@ -49,6 +64,72 @@ public class TestHoodieFileGroupRecordBuffer { assertEquals(convertedValue, getOrderingValue(readerContext, deleteRecord)); } + @ParameterizedTest + @CsvSource({ + "true, true, EVENT_TIME_ORDERING", + "true, false, EVENT_TIME_ORDERING", + "false, true, EVENT_TIME_ORDERING", + "false, false, EVENT_TIME_ORDERING", + "true, true, COMMIT_TIME_ORDERING", + "true, false, COMMIT_TIME_ORDERING", + "false, true, COMMIT_TIME_ORDERING", + "false, false, COMMIT_TIME_ORDERING", + "true, true, CUSTOM", + "true, false, CUSTOM", + "false, true, CUSTOM", + "false, false, CUSTOM" + }) + public void testSchemaForMandatoryFields(boolean setPrecombine, boolean addHoodieIsDeleted, RecordMergeMode mergeMode) { + HoodieReaderContext readerContext = mock(HoodieReaderContext.class); + when(readerContext.getHasBootstrapBaseFile()).thenReturn(false); + when(readerContext.getHasLogFiles()).thenReturn(true); + HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class); + when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger)); + when(recordMerger.isProjectionCompatible()).thenReturn(false); + + String preCombineField = "ts"; + List<String> dataSchemaFields = new ArrayList<>(); + dataSchemaFields.addAll(Arrays.asList( + HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, preCombineField, + "colA", "colB", "colC", "colD")); + if (addHoodieIsDeleted) { + dataSchemaFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD); + } + + Schema dataSchema = getSchema(dataSchemaFields); + Schema requestedSchema = getSchema(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD)); + + HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); + when(tableConfig.getRecordMergeMode()).thenReturn(mergeMode); + when(tableConfig.populateMetaFields()).thenReturn(true); + when(tableConfig.getPreCombineField()).thenReturn(setPrecombine ? preCombineField : StringUtils.EMPTY_STRING); + + TypedProperties props = new TypedProperties(); + HoodieFileGroupReaderSchemaHandler fileGroupReaderSchemaHandler = new HoodieFileGroupReaderSchemaHandler(readerContext, + dataSchema, requestedSchema, Option.empty(), tableConfig, props); + List<String> expectedFields = new ArrayList(); + expectedFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD); + expectedFields.add(HoodieRecord.PARTITION_PATH_METADATA_FIELD); + if (setPrecombine && mergeMode != RecordMergeMode.COMMIT_TIME_ORDERING) { // commit time ordering does not project ordering field. + expectedFields.add(preCombineField); + } + if (addHoodieIsDeleted) { + expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD); + } + Schema expectedSchema = mergeMode == RecordMergeMode.CUSTOM ? dataSchema : getSchema(expectedFields); + Schema actualSchema = fileGroupReaderSchemaHandler.generateRequiredSchema(); + assertEquals(expectedSchema, actualSchema); + } + + private Schema getSchema(List<String> fields) { + SchemaBuilder.FieldAssembler<Schema> schemaFieldAssembler = SchemaBuilder.builder().record("test_schema") + .namespace("test_namespace").fields(); + for (String field : fields) { + schemaFieldAssembler = schemaFieldAssembler.name(field).type().stringType().noDefault(); + } + return schemaFieldAssembler.endRecord(); + } + private void mockDeleteRecord(DeleteRecord deleteRecord, Comparable orderingValue) { when(deleteRecord.getOrderingValue()).thenReturn(orderingValue); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java index bb24ee0e52a..21b1a9be870 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java @@ -207,7 +207,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieSparkClientTestBase .withPreCombineField("ts").build(); List<Row> inserts = DataSourceTestUtils.generateRandomRows(10); Dataset<Row> toUpdateDataset = sqlContext.createDataFrame(inserts.subList(0, 5), structType); - List<Row> updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset); + List<Row> updates = DataSourceTestUtils.updateRowsWithUpdatedTs(toUpdateDataset); List<Row> rows = new ArrayList<>(); rows.addAll(inserts); rows.addAll(updates); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java index 08a82284898..4c371136dbc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java @@ -68,10 +68,11 @@ public class DataSourceTestUtils { List<Row> toReturn = new ArrayList<>(); List<String> partitions = Arrays.asList(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}); for (int i = 0; i < count; i++) { - Object[] values = new Object[3]; + Object[] values = new Object[4]; values[0] = HoodieTestDataGenerator.genPseudoRandomUUID(RANDOM).toString(); values[1] = partitions.get(RANDOM.nextInt(3)); values[2] = new Date().getTime(); + values[3] = false; toReturn.add(RowFactory.create(values)); } return toReturn; @@ -80,10 +81,11 @@ public class DataSourceTestUtils { public static List<Row> generateRandomRowsByPartition(int count, String partition) { List<Row> toReturn = new ArrayList<>(); for (int i = 0; i < count; i++) { - Object[] values = new Object[3]; + Object[] values = new Object[4]; values[0] = HoodieTestDataGenerator.genPseudoRandomUUID(RANDOM).toString(); values[1] = partition; values[2] = new Date().getTime(); + values[3] = false; toReturn.add(RowFactory.create(values)); } return toReturn; @@ -92,10 +94,11 @@ public class DataSourceTestUtils { public static List<Row> generateUpdates(List<Row> records, int count) { List<Row> toReturn = new ArrayList<>(); for (int i = 0; i < count; i++) { - Object[] values = new Object[3]; + Object[] values = new Object[4]; values[0] = records.get(i).getString(0); values[1] = records.get(i).getAs(1); values[2] = new Date().getTime(); + values[3] = false; toReturn.add(RowFactory.create(values)); } return toReturn; @@ -119,24 +122,36 @@ public class DataSourceTestUtils { List<Row> toReturn = new ArrayList<>(); List<String> partitions = Arrays.asList(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}); for (int i = 0; i < count; i++) { - Object[] values = new Object[4]; + Object[] values = new Object[5]; values[0] = UUID.randomUUID().toString(); values[1] = partitions.get(RANDOM.nextInt(3)); values[2] = new Date().getTime(); - values[3] = UUID.randomUUID().toString(); + values[3] = false; + values[4] = UUID.randomUUID().toString(); toReturn.add(RowFactory.create(values)); } return toReturn; } - public static List<Row> updateRowsWithHigherTs(Dataset<Row> inputDf) { + public static List<Row> updateRowsWithUpdatedTs(Dataset<Row> inputDf) { + return updateRowsWithUpdatedTs(inputDf, false, false); + } + + public static List<Row> updateRowsWithUpdatedTs(Dataset<Row> inputDf, Boolean lowerTs, Boolean updatePartitionPath) { List<Row> input = inputDf.collectAsList(); List<Row> rows = new ArrayList<>(); for (Row row : input) { - Object[] values = new Object[3]; + Object[] values = new Object[4]; values[0] = row.getAs("_row_key"); - values[1] = row.getAs("partition"); - values[2] = ((Long) row.getAs("ts")) + RANDOM.nextInt(1000); + String partition = row.getAs("partition"); + if (updatePartitionPath) { + values[1] = partition.equals(DEFAULT_FIRST_PARTITION_PATH) ? DEFAULT_SECOND_PARTITION_PATH : + (partition.equals(DEFAULT_SECOND_PARTITION_PATH) ? DEFAULT_THIRD_PARTITION_PATH : DEFAULT_FIRST_PARTITION_PATH); + } else { + values[1] = partition; + } + values[2] = ((Long) row.getAs("ts")) + (lowerTs ? (-1 - RANDOM.nextInt(1000)) : RANDOM.nextInt(1000)); + values[3] = false; rows.add(RowFactory.create(values)); } return rows; diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt index 5fcddac6a84..2db0ff3428d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt @@ -32,6 +32,11 @@ "name": "ts", "type": ["long", "null"] }, + { + "name": "_hoodie_is_deleted", + "type": ["boolean", "null"], + "default" : false + }, { "name": "new_field", "type": ["string","null"] diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt index c7c0ff73792..a311dc68543 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt @@ -31,6 +31,11 @@ { "name": "ts", "type": ["long", "null"] + }, + { + "name": "_hoodie_is_deleted", + "type": ["boolean", "null"], + "default" : false } ] } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index ec9cf372912..4f4854d497c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -89,7 +89,7 @@ class TestHoodieSparkSqlWriter extends HoodieSparkWriterTestBase { // add some updates so that preCombine kicks in val toUpdateDataset = sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, 40), structType) - val updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset) + val updates = DataSourceTestUtils.updateRowsWithUpdatedTs(toUpdateDataset) val records = inserts.asScala.union(updates.asScala) val recordsSeq = convertRowListToSeq(records.asJava) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 687314940ea..164103e0e77 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -33,15 +33,13 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, TimelineUtils} import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils} -import org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR -import org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR +import org.apache.hudi.common.testutils.HoodieTestUtils.{INSTANT_FILE_NAME_GENERATOR, INSTANT_GENERATOR} import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.common.util.{ClusteringUtils, Option} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.metrics.HoodieMetricsConfig import org.apache.hudi.exception.{HoodieException, SchemaBackwardsCompatibilityException} import org.apache.hudi.exception.ExceptionUtil.getRootCause -import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, GlobalDeleteKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.keygen.constant.KeyGeneratorOptions @@ -55,7 +53,7 @@ import org.apache.hadoop.fs.FileSystem import org.apache.spark.sql.{DataFrame, DataFrameWriter, Dataset, Encoders, Row, SaveMode, SparkSession, SparkSessionExtensions} import org.apache.spark.sql.functions.{col, concat, lit, udf, when} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension -import org.apache.spark.sql.types.{ArrayType, BooleanType, DataTypes, DateType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, DataTypes, DateType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType} import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test} @@ -1415,41 +1413,6 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(3, snapshotDF1.select("partition").distinct().count()) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testHoodieIsDeletedCOW(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) - - val numRecords = 100 - val numRecordsToDelete = 2 - val records0 = recordsToStrings(dataGen.generateInserts("000", numRecords)).asScala.toList - val df0 = spark.read.json(spark.sparkContext.parallelize(records0, 2)) - df0.write.format("org.apache.hudi") - .options(writeOpts) - .mode(SaveMode.Overwrite) - .save(basePath) - - val snapshotDF0 = spark.read.format("org.apache.hudi") - .options(readOpts) - .load(basePath) - assertEquals(numRecords, snapshotDF0.count()) - - val df1 = snapshotDF0.limit(numRecordsToDelete) - val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*) - val df2 = convertColumnsToNullable( - dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType)), - "_hoodie_is_deleted" - ) - df2.write.format("org.apache.hudi") - .options(writeOpts) - .mode(SaveMode.Append) - .save(basePath) - val snapshotDF2 = spark.read.format("org.apache.hudi") - .options(readOpts) - .load(basePath) - assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count()) - } - @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testWriteSmallPrecisionDecimalTable(recordType: HoodieRecordType): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index a4e240c57fc..2bb27a1a7fe 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -31,7 +31,6 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.util.Option import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} -import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.metadata.HoodieTableMetadataUtil.{metadataPartitionExists, PARTITION_NAME_SECONDARY_INDEX_PREFIX} import org.apache.hudi.storage.{StoragePath, StoragePathInfo} @@ -43,7 +42,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.hudi.HoodieSparkSessionExtension -import org.apache.spark.sql.types.BooleanType import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest @@ -1192,50 +1190,6 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin spark.read.format("hudi").load(basePath).count()) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testHoodieIsDeletedMOR(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) - - val numRecords = 100 - val numRecordsToDelete = 2 - val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA - val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", numRecords, schema)).asScala.toSeq - val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2)) - inputDF0.write.format("org.apache.hudi") - .options(writeOpts) - .option("hoodie.compact.inline", "false") - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .mode(SaveMode.Overwrite) - .save(basePath) - - val snapshotDF0 = spark.read.format("org.apache.hudi") - .options(readOpts) - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) - .load(basePath) - assertEquals(numRecords, snapshotDF0.count()) - - val df1 = snapshotDF0.limit(numRecordsToDelete) - val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*) - - val df2 = convertColumnsToNullable( - dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType)), - "_hoodie_is_deleted" - ) - df2.write.format("org.apache.hudi") - .options(writeOpts) - .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .mode(SaveMode.Append) - .save(basePath) - - val snapshotDF2 = spark.read.format("org.apache.hudi") - .options(readOpts) - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) - .load(basePath) - assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count()) - } - /** * This tests the case that query by with a specified partition condition on hudi table which is * different between the value of the partition field and the actual partition path, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala index 8873ada92d3..d891ed7c302 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala @@ -19,19 +19,24 @@ package org.apache.hudi.functional -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} -import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode} +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} +import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.functional.CommonOptionUtils.getWriterReaderOpts import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.testutils.{DataSourceTestUtils, SparkClientFunctionalTestHarness} import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf import org.apache.spark.SparkConf import org.apache.spark.sql._ +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.types.StructType import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource @@ -294,6 +299,127 @@ class TestSparkDataSource extends SparkClientFunctionalTestHarness { inputDf2.unpersist(true) } + @ParameterizedTest + @CsvSource(value = Array("COPY_ON_WRITE,8,EVENT_TIME_ORDERING,RECORD_INDEX", + "COPY_ON_WRITE,8,COMMIT_TIME_ORDERING,RECORD_INDEX", + "COPY_ON_WRITE,8,EVENT_TIME_ORDERING,GLOBAL_SIMPLE", + "COPY_ON_WRITE,8,COMMIT_TIME_ORDERING,GLOBAL_SIMPLE", + "MERGE_ON_READ,8,EVENT_TIME_ORDERING,RECORD_INDEX", + "MERGE_ON_READ,8,COMMIT_TIME_ORDERING,RECORD_INDEX", + "MERGE_ON_READ,8,EVENT_TIME_ORDERING,GLOBAL_SIMPLE", + "MERGE_ON_READ,8,COMMIT_TIME_ORDERING,GLOBAL_SIMPLE")) + def testDeletesWithHoodieIsDeleted(tableType: HoodieTableType, tableVersion: Int, mergeMode: RecordMergeMode, indexType: IndexType): Unit = { + var (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO) + writeOpts = writeOpts ++ Map("hoodie.write.table.version" -> tableVersion.toString, + "hoodie.datasource.write.table.type" -> tableType.name(), + "hoodie.datasource.write.precombine.field" -> "ts", + "hoodie.write.record.merge.mode" -> mergeMode.name(), + "hoodie.index.type" -> indexType.name(), + "hoodie.metadata.record.index.enable" -> "true", + "hoodie.record.index.update.partition.path" -> "true", + "hoodie.parquet.small.file.limit" -> "0") + + writeOpts = writeOpts + (if (indexType == IndexType.RECORD_INDEX) { + "hoodie.record.index.update.partition.path" -> "true" + } else { + "hoodie.simple.index.update.partition.path" -> "true" + }) + + // generate the inserts + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val inserts = DataSourceTestUtils.generateRandomRows(400) + val df = spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(inserts)), structType) + + df.write.format("hudi") + .options(writeOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .mode(SaveMode.Overwrite) + .save(basePath) + + val hudiSnapshotDF1 = spark.read.format("hudi") + .options(readOpts) + .load(basePath) + assertEquals(400, hudiSnapshotDF1.count()) + + // ingest batch2 with mix of updates and deletes. some of them are updating same partition, some of them are moving to new partition. + // some are having higher ts and some are having lower ts. + ingestNewBatch(tableType, 200, structType, inserts.subList(0, 200), writeOpts) + + val expectedRecordCount2 = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) 350 else 300; + val hudiSnapshotDF2 = spark.read.format("hudi") + .options(readOpts) + .load(basePath) + assertEquals(expectedRecordCount2, hudiSnapshotDF2.count()) + + // querying subset of column. even if not including _hoodie_is_deleted, snapshot read should return right data. + assertEquals(expectedRecordCount2, spark.read.format("hudi") + .options(readOpts).load(basePath).select("_hoodie_record_key", "_hoodie_partition_path").count()) + + // ingest batch3 with mix of updates and deletes. some of them are updating same partition, some of them are moving to new partition. + // some are having higher ts and some are having lower ts. + ingestNewBatch(tableType, 200, structType, inserts.subList(200, 400), writeOpts) + + val expectedRecordCount3 = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) 300 else 200; + val hudiSnapshotDF3 = spark.read.format("hudi") + .options(readOpts) + .load(basePath) + assertEquals(expectedRecordCount3, hudiSnapshotDF3.count()) + + // querying subset of column. even if not including _hoodie_is_deleted, snapshot read should return right data. + assertEquals(expectedRecordCount3, spark.read.format("hudi") + .options(readOpts).load(basePath).select("_hoodie_record_key", "_hoodie_partition_path").count()) + } + + def ingestNewBatch(tableType: HoodieTableType, recordsToUpdate: Integer, structType: StructType, inserts: java.util.List[Row], + writeOpts: Map[String, String]): Unit = { + val toUpdate = sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, recordsToUpdate), structType).collectAsList() + + val updateToSamePartitionHigherTs = sqlContext.createDataFrame(toUpdate.subList(0, recordsToUpdate / 4), structType) + val rowsToUpdate1 = DataSourceTestUtils.updateRowsWithUpdatedTs(updateToSamePartitionHigherTs) + val updates1 = rowsToUpdate1.subList(0, recordsToUpdate / 8) + val updateDf1 = spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates1)), structType) + val deletes1 = rowsToUpdate1.subList(recordsToUpdate / 8, recordsToUpdate / 4) + val deleteDf1 = spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes1)), structType) + val batch1 = deleteDf1.withColumn("_hoodie_is_deleted", lit(true)).union(updateDf1) + batch1.cache() + + val updateToDiffPartitionHigherTs = sqlContext.createDataFrame(toUpdate.subList(recordsToUpdate / 4, recordsToUpdate / 2), structType) + val rowsToUpdate2 = DataSourceTestUtils.updateRowsWithUpdatedTs(updateToDiffPartitionHigherTs, false, true) + val updates2 = rowsToUpdate2.subList(0, recordsToUpdate / 8) + val updateDf2 = spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates2)), structType) + val deletes2 = rowsToUpdate2.subList(recordsToUpdate / 8, recordsToUpdate / 4) + val deleteDf2 = spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes2)), structType) + val batch2 = deleteDf2.withColumn("_hoodie_is_deleted", lit(true)).union(updateDf2) + batch2.cache() + + val updateToSamePartitionLowerTs = sqlContext.createDataFrame(toUpdate.subList(recordsToUpdate / 2, recordsToUpdate * 3 / 4), structType) + val rowsToUpdate3 = DataSourceTestUtils.updateRowsWithUpdatedTs(updateToSamePartitionLowerTs, true, false) + val updates3 = rowsToUpdate3.subList(0, recordsToUpdate / 8) + val updateDf3 = spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates3)), structType) + val deletes3 = rowsToUpdate3.subList(recordsToUpdate / 8, recordsToUpdate / 4) + val deleteDf3 = spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes3)), structType) + val batch3 = deleteDf3.withColumn("_hoodie_is_deleted", lit(true)).union(updateDf3) + batch3.cache() + + val updateToDiffPartitionLowerTs = sqlContext.createDataFrame(toUpdate.subList(recordsToUpdate * 3 / 4, recordsToUpdate), structType) + val rowsToUpdate4 = DataSourceTestUtils.updateRowsWithUpdatedTs(updateToDiffPartitionLowerTs, true, true) + val updates4 = rowsToUpdate4.subList(0, recordsToUpdate / 8) + val updateDf4 = spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates4)), structType) + val deletes4 = rowsToUpdate4.subList(recordsToUpdate / 8, recordsToUpdate / 4) + val deleteDf4 = spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes4)), structType) + val batch4 = deleteDf4.withColumn("_hoodie_is_deleted", lit(true)).union(updateDf4) + batch4.cache() + + batch1.union(batch2).union(batch3).union(batch4).write.format("hudi") + .options(writeOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .mode(SaveMode.Append) + .save(basePath) + } + def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], beforeRows: List[Row], colsToCompare: String): Unit = { val hudiWithoutMetaDf = hudiDf.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD) hudiWithoutMetaDf.registerTempTable("hudiTbl") @@ -354,4 +480,7 @@ class TestSparkDataSource extends SparkClientFunctionalTestHarness { assertEquals(hudiDf1ToCompare.count, hudiDf1ToCompare.intersect(hudiDf2ToCompare).count) assertEquals(0, hudiDf1ToCompare.except(hudiDf2ToCompare).count) } + + def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = + asScalaIteratorConverter(inputList.iterator).asScala.toSeq }
