This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1f43b231763 [HUDI-9170] Fixing schema projection with file group
reader (#12970)
1f43b231763 is described below
commit 1f43b231763a978bef8d340a654e9f6287241ec9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Mar 17 20:23:27 2025 -0700
[HUDI-9170] Fixing schema projection with file group reader (#12970)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../read/HoodieFileGroupReaderSchemaHandler.java | 7 +-
.../read/TestHoodieFileGroupRecordBuffer.java | 81 +++++++++++++
.../TestHoodieDatasetBulkInsertHelper.java | 2 +-
.../apache/hudi/testutils/DataSourceTestUtils.java | 33 +++--
.../src/test/resources/exampleEvolvedSchema.txt | 5 +
.../src/test/resources/exampleSchema.txt | 5 +
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 41 +------
.../apache/hudi/functional/TestMORDataSource.scala | 46 -------
.../hudi/functional/TestSparkDataSource.scala | 135 ++++++++++++++++++++-
10 files changed, 257 insertions(+), 100 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
index 714d43784be..f007f9ef400 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
@@ -150,7 +150,8 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
return
AvroInternalSchemaConverter.pruneAvroSchemaToInternalSchema(requiredSchema,
internalSchema);
}
- private Schema generateRequiredSchema() {
+ @VisibleForTesting
+ Schema generateRequiredSchema() {
//might need to change this if other queries than mor have mandatory fields
if (!needsMORMerge) {
return requestedSchema;
@@ -204,6 +205,10 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
requiredFields.add(preCombine);
}
}
+
+ if (dataSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) != null) {
+ requiredFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+ }
return requiredFields.toArray(new String[0]);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
index 65683bd889a..ca3a09d96df 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
@@ -19,10 +19,25 @@
package org.apache.hudi.common.table.read;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
import static
org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.getOrderingValue;
@@ -49,6 +64,72 @@ public class TestHoodieFileGroupRecordBuffer {
assertEquals(convertedValue, getOrderingValue(readerContext,
deleteRecord));
}
+ @ParameterizedTest
+ @CsvSource({
+ "true, true, EVENT_TIME_ORDERING",
+ "true, false, EVENT_TIME_ORDERING",
+ "false, true, EVENT_TIME_ORDERING",
+ "false, false, EVENT_TIME_ORDERING",
+ "true, true, COMMIT_TIME_ORDERING",
+ "true, false, COMMIT_TIME_ORDERING",
+ "false, true, COMMIT_TIME_ORDERING",
+ "false, false, COMMIT_TIME_ORDERING",
+ "true, true, CUSTOM",
+ "true, false, CUSTOM",
+ "false, true, CUSTOM",
+ "false, false, CUSTOM"
+ })
+ public void testSchemaForMandatoryFields(boolean setPrecombine, boolean
addHoodieIsDeleted, RecordMergeMode mergeMode) {
+ HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+ when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
+ when(readerContext.getHasLogFiles()).thenReturn(true);
+ HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+ when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger));
+ when(recordMerger.isProjectionCompatible()).thenReturn(false);
+
+ String preCombineField = "ts";
+ List<String> dataSchemaFields = new ArrayList<>();
+ dataSchemaFields.addAll(Arrays.asList(
+ HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD, preCombineField,
+ "colA", "colB", "colC", "colD"));
+ if (addHoodieIsDeleted) {
+ dataSchemaFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+ }
+
+ Schema dataSchema = getSchema(dataSchemaFields);
+ Schema requestedSchema =
getSchema(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+ when(tableConfig.getRecordMergeMode()).thenReturn(mergeMode);
+ when(tableConfig.populateMetaFields()).thenReturn(true);
+ when(tableConfig.getPreCombineField()).thenReturn(setPrecombine ?
preCombineField : StringUtils.EMPTY_STRING);
+
+ TypedProperties props = new TypedProperties();
+ HoodieFileGroupReaderSchemaHandler fileGroupReaderSchemaHandler = new
HoodieFileGroupReaderSchemaHandler(readerContext,
+ dataSchema, requestedSchema, Option.empty(), tableConfig, props);
+ List<String> expectedFields = new ArrayList();
+ expectedFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ expectedFields.add(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+ if (setPrecombine && mergeMode != RecordMergeMode.COMMIT_TIME_ORDERING) {
// commit time ordering does not project ordering field.
+ expectedFields.add(preCombineField);
+ }
+ if (addHoodieIsDeleted) {
+ expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+ }
+ Schema expectedSchema = mergeMode == RecordMergeMode.CUSTOM ? dataSchema :
getSchema(expectedFields);
+ Schema actualSchema =
fileGroupReaderSchemaHandler.generateRequiredSchema();
+ assertEquals(expectedSchema, actualSchema);
+ }
+
+ private Schema getSchema(List<String> fields) {
+ SchemaBuilder.FieldAssembler<Schema> schemaFieldAssembler =
SchemaBuilder.builder().record("test_schema")
+ .namespace("test_namespace").fields();
+ for (String field : fields) {
+ schemaFieldAssembler =
schemaFieldAssembler.name(field).type().stringType().noDefault();
+ }
+ return schemaFieldAssembler.endRecord();
+ }
+
private void mockDeleteRecord(DeleteRecord deleteRecord,
Comparable orderingValue) {
when(deleteRecord.getOrderingValue()).thenReturn(orderingValue);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
index bb24ee0e52a..21b1a9be870 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
@@ -207,7 +207,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
.withPreCombineField("ts").build();
List<Row> inserts = DataSourceTestUtils.generateRandomRows(10);
Dataset<Row> toUpdateDataset =
sqlContext.createDataFrame(inserts.subList(0, 5), structType);
- List<Row> updates =
DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset);
+ List<Row> updates =
DataSourceTestUtils.updateRowsWithUpdatedTs(toUpdateDataset);
List<Row> rows = new ArrayList<>();
rows.addAll(inserts);
rows.addAll(updates);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index 08a82284898..4c371136dbc 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -68,10 +68,11 @@ public class DataSourceTestUtils {
List<Row> toReturn = new ArrayList<>();
List<String> partitions = Arrays.asList(new String[]
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH,
DEFAULT_THIRD_PARTITION_PATH});
for (int i = 0; i < count; i++) {
- Object[] values = new Object[3];
+ Object[] values = new Object[4];
values[0] =
HoodieTestDataGenerator.genPseudoRandomUUID(RANDOM).toString();
values[1] = partitions.get(RANDOM.nextInt(3));
values[2] = new Date().getTime();
+ values[3] = false;
toReturn.add(RowFactory.create(values));
}
return toReturn;
@@ -80,10 +81,11 @@ public class DataSourceTestUtils {
public static List<Row> generateRandomRowsByPartition(int count, String
partition) {
List<Row> toReturn = new ArrayList<>();
for (int i = 0; i < count; i++) {
- Object[] values = new Object[3];
+ Object[] values = new Object[4];
values[0] =
HoodieTestDataGenerator.genPseudoRandomUUID(RANDOM).toString();
values[1] = partition;
values[2] = new Date().getTime();
+ values[3] = false;
toReturn.add(RowFactory.create(values));
}
return toReturn;
@@ -92,10 +94,11 @@ public class DataSourceTestUtils {
public static List<Row> generateUpdates(List<Row> records, int count) {
List<Row> toReturn = new ArrayList<>();
for (int i = 0; i < count; i++) {
- Object[] values = new Object[3];
+ Object[] values = new Object[4];
values[0] = records.get(i).getString(0);
values[1] = records.get(i).getAs(1);
values[2] = new Date().getTime();
+ values[3] = false;
toReturn.add(RowFactory.create(values));
}
return toReturn;
@@ -119,24 +122,36 @@ public class DataSourceTestUtils {
List<Row> toReturn = new ArrayList<>();
List<String> partitions = Arrays.asList(new String[]
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH,
DEFAULT_THIRD_PARTITION_PATH});
for (int i = 0; i < count; i++) {
- Object[] values = new Object[4];
+ Object[] values = new Object[5];
values[0] = UUID.randomUUID().toString();
values[1] = partitions.get(RANDOM.nextInt(3));
values[2] = new Date().getTime();
- values[3] = UUID.randomUUID().toString();
+ values[3] = false;
+ values[4] = UUID.randomUUID().toString();
toReturn.add(RowFactory.create(values));
}
return toReturn;
}
- public static List<Row> updateRowsWithHigherTs(Dataset<Row> inputDf) {
+ public static List<Row> updateRowsWithUpdatedTs(Dataset<Row> inputDf) {
+ return updateRowsWithUpdatedTs(inputDf, false, false);
+ }
+
+ public static List<Row> updateRowsWithUpdatedTs(Dataset<Row> inputDf,
Boolean lowerTs, Boolean updatePartitionPath) {
List<Row> input = inputDf.collectAsList();
List<Row> rows = new ArrayList<>();
for (Row row : input) {
- Object[] values = new Object[3];
+ Object[] values = new Object[4];
values[0] = row.getAs("_row_key");
- values[1] = row.getAs("partition");
- values[2] = ((Long) row.getAs("ts")) + RANDOM.nextInt(1000);
+ String partition = row.getAs("partition");
+ if (updatePartitionPath) {
+ values[1] = partition.equals(DEFAULT_FIRST_PARTITION_PATH) ?
DEFAULT_SECOND_PARTITION_PATH :
+ (partition.equals(DEFAULT_SECOND_PARTITION_PATH) ?
DEFAULT_THIRD_PARTITION_PATH : DEFAULT_FIRST_PARTITION_PATH);
+ } else {
+ values[1] = partition;
+ }
+ values[2] = ((Long) row.getAs("ts")) + (lowerTs ? (-1 -
RANDOM.nextInt(1000)) : RANDOM.nextInt(1000));
+ values[3] = false;
rows.add(RowFactory.create(values));
}
return rows;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
index 5fcddac6a84..2db0ff3428d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
@@ -32,6 +32,11 @@
"name": "ts",
"type": ["long", "null"]
},
+ {
+ "name": "_hoodie_is_deleted",
+ "type": ["boolean", "null"],
+ "default" : false
+ },
{
"name": "new_field",
"type": ["string","null"]
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt
index c7c0ff73792..a311dc68543 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt
@@ -31,6 +31,11 @@
{
"name": "ts",
"type": ["long", "null"]
+ },
+ {
+ "name": "_hoodie_is_deleted",
+ "type": ["boolean", "null"],
+ "default" : false
}
]
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index ec9cf372912..4f4854d497c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -89,7 +89,7 @@ class TestHoodieSparkSqlWriter extends
HoodieSparkWriterTestBase {
// add some updates so that preCombine kicks in
val toUpdateDataset =
sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, 40),
structType)
- val updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset)
+ val updates = DataSourceTestUtils.updateRowsWithUpdatedTs(toUpdateDataset)
val records = inserts.asScala.union(updates.asScala)
val recordsSeq = convertRowListToSeq(records.asJava)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 687314940ea..164103e0e77 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -33,15 +33,13 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline,
TimelineUtils}
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestUtils}
-import
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR
-import org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR
+import
org.apache.hudi.common.testutils.HoodieTestUtils.{INSTANT_FILE_NAME_GENERATOR,
INSTANT_GENERATOR}
import
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings,
recordsToStrings}
import org.apache.hudi.common.util.{ClusteringUtils, Option}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.metrics.HoodieMetricsConfig
import org.apache.hudi.exception.{HoodieException,
SchemaBackwardsCompatibilityException}
import org.apache.hudi.exception.ExceptionUtil.getRootCause
-import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator,
GlobalDeleteKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator,
TimestampBasedKeyGenerator}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -55,7 +53,7 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Dataset, Encoders,
Row, SaveMode, SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.functions.{col, concat, lit, udf, when}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
-import org.apache.spark.sql.types.{ArrayType, BooleanType, DataTypes,
DateType, IntegerType, LongType, MapType, StringType, StructField, StructType,
TimestampType}
+import org.apache.spark.sql.types.{ArrayType, DataTypes, DateType,
IntegerType, LongType, MapType, StringType, StructField, StructType,
TimestampType}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
@@ -1415,41 +1413,6 @@ class TestCOWDataSource extends
HoodieSparkClientTestBase with ScalaAssertionSup
assertEquals(3, snapshotDF1.select("partition").distinct().count())
}
- @ParameterizedTest
- @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
- def testHoodieIsDeletedCOW(recordType: HoodieRecordType): Unit = {
- val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
-
- val numRecords = 100
- val numRecordsToDelete = 2
- val records0 = recordsToStrings(dataGen.generateInserts("000",
numRecords)).asScala.toList
- val df0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
- df0.write.format("org.apache.hudi")
- .options(writeOpts)
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- val snapshotDF0 = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .load(basePath)
- assertEquals(numRecords, snapshotDF0.count())
-
- val df1 = snapshotDF0.limit(numRecordsToDelete)
- val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
- val df2 = convertColumnsToNullable(
- dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType)),
- "_hoodie_is_deleted"
- )
- df2.write.format("org.apache.hudi")
- .options(writeOpts)
- .mode(SaveMode.Append)
- .save(basePath)
- val snapshotDF2 = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .load(basePath)
- assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
- }
-
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
def testWriteSmallPrecisionDecimalTable(recordType: HoodieRecordType): Unit
= {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index a4e240c57fc..2bb27a1a7fe 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -31,7 +31,6 @@ import
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.Option
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
-import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
import org.apache.hudi.index.HoodieIndex.IndexType
import
org.apache.hudi.metadata.HoodieTableMetadataUtil.{metadataPartitionExists,
PARTITION_NAME_SECONDARY_INDEX_PREFIX}
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
@@ -43,7 +42,6 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
-import org.apache.spark.sql.types.BooleanType
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
@@ -1192,50 +1190,6 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
spark.read.format("hudi").load(basePath).count())
}
- @ParameterizedTest
- @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
- def testHoodieIsDeletedMOR(recordType: HoodieRecordType): Unit = {
- val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
-
- val numRecords = 100
- val numRecordsToDelete = 2
- val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA
- val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000",
numRecords, schema)).asScala.toSeq
- val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
- inputDF0.write.format("org.apache.hudi")
- .options(writeOpts)
- .option("hoodie.compact.inline", "false")
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
- .option(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- val snapshotDF0 = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
- .load(basePath)
- assertEquals(numRecords, snapshotDF0.count())
-
- val df1 = snapshotDF0.limit(numRecordsToDelete)
- val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
-
- val df2 = convertColumnsToNullable(
- dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType)),
- "_hoodie_is_deleted"
- )
- df2.write.format("org.apache.hudi")
- .options(writeOpts)
- .option(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .mode(SaveMode.Append)
- .save(basePath)
-
- val snapshotDF2 = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
- .load(basePath)
- assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
- }
-
/**
* This tests the case that query by with a specified partition condition on
hudi table which is
* different between the value of the partition field and the actual
partition path,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 8873ada92d3..d891ed7c302 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -19,19 +19,24 @@
package org.apache.hudi.functional
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
-import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
+import org.apache.hudi.functional.CommonOptionUtils.getWriterReaderOpts
import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.{DataSourceTestUtils,
SparkClientFunctionalTestHarness}
import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
import org.apache.spark.SparkConf
import org.apache.spark.sql._
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
@@ -294,6 +299,127 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
inputDf2.unpersist(true)
}
+ @ParameterizedTest
+ @CsvSource(value = Array("COPY_ON_WRITE,8,EVENT_TIME_ORDERING,RECORD_INDEX",
+ "COPY_ON_WRITE,8,COMMIT_TIME_ORDERING,RECORD_INDEX",
+ "COPY_ON_WRITE,8,EVENT_TIME_ORDERING,GLOBAL_SIMPLE",
+ "COPY_ON_WRITE,8,COMMIT_TIME_ORDERING,GLOBAL_SIMPLE",
+ "MERGE_ON_READ,8,EVENT_TIME_ORDERING,RECORD_INDEX",
+ "MERGE_ON_READ,8,COMMIT_TIME_ORDERING,RECORD_INDEX",
+ "MERGE_ON_READ,8,EVENT_TIME_ORDERING,GLOBAL_SIMPLE",
+ "MERGE_ON_READ,8,COMMIT_TIME_ORDERING,GLOBAL_SIMPLE"))
+ def testDeletesWithHoodieIsDeleted(tableType: HoodieTableType, tableVersion:
Int, mergeMode: RecordMergeMode, indexType: IndexType): Unit = {
+ var (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
+ writeOpts = writeOpts ++ Map("hoodie.write.table.version" ->
tableVersion.toString,
+ "hoodie.datasource.write.table.type" -> tableType.name(),
+ "hoodie.datasource.write.precombine.field" -> "ts",
+ "hoodie.write.record.merge.mode" -> mergeMode.name(),
+ "hoodie.index.type" -> indexType.name(),
+ "hoodie.metadata.record.index.enable" -> "true",
+ "hoodie.record.index.update.partition.path" -> "true",
+ "hoodie.parquet.small.file.limit" -> "0")
+
+ writeOpts = writeOpts + (if (indexType == IndexType.RECORD_INDEX) {
+ "hoodie.record.index.update.partition.path" -> "true"
+ } else {
+ "hoodie.simple.index.update.partition.path" -> "true"
+ })
+
+ // generate the inserts
+ val schema = DataSourceTestUtils.getStructTypeExampleSchema
+ val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ val inserts = DataSourceTestUtils.generateRandomRows(400)
+ val df =
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(inserts)),
structType)
+
+ df.write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val hudiSnapshotDF1 = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ assertEquals(400, hudiSnapshotDF1.count())
+
+ // ingest batch2 with mix of updates and deletes. some of them are
updating same partition, some of them are moving to new partition.
+ // some are having higher ts and some are having lower ts.
+ ingestNewBatch(tableType, 200, structType, inserts.subList(0, 200),
writeOpts)
+
+ val expectedRecordCount2 = if (mergeMode ==
RecordMergeMode.EVENT_TIME_ORDERING) 350 else 300;
+ val hudiSnapshotDF2 = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ assertEquals(expectedRecordCount2, hudiSnapshotDF2.count())
+
+ // querying subset of column. even if not including _hoodie_is_deleted,
snapshot read should return right data.
+ assertEquals(expectedRecordCount2, spark.read.format("hudi")
+ .options(readOpts).load(basePath).select("_hoodie_record_key",
"_hoodie_partition_path").count())
+
+ // ingest batch3 with mix of updates and deletes. some of them are
updating same partition, some of them are moving to new partition.
+ // some are having higher ts and some are having lower ts.
+ ingestNewBatch(tableType, 200, structType, inserts.subList(200, 400),
writeOpts)
+
+ val expectedRecordCount3 = if (mergeMode ==
RecordMergeMode.EVENT_TIME_ORDERING) 300 else 200;
+ val hudiSnapshotDF3 = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ assertEquals(expectedRecordCount3, hudiSnapshotDF3.count())
+
+ // querying subset of column. even if not including _hoodie_is_deleted,
snapshot read should return right data.
+ assertEquals(expectedRecordCount3, spark.read.format("hudi")
+ .options(readOpts).load(basePath).select("_hoodie_record_key",
"_hoodie_partition_path").count())
+ }
+
+ def ingestNewBatch(tableType: HoodieTableType, recordsToUpdate: Integer,
structType: StructType, inserts: java.util.List[Row],
+ writeOpts: Map[String, String]): Unit = {
+ val toUpdate =
sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts,
recordsToUpdate), structType).collectAsList()
+
+ val updateToSamePartitionHigherTs =
sqlContext.createDataFrame(toUpdate.subList(0, recordsToUpdate / 4), structType)
+ val rowsToUpdate1 =
DataSourceTestUtils.updateRowsWithUpdatedTs(updateToSamePartitionHigherTs)
+ val updates1 = rowsToUpdate1.subList(0, recordsToUpdate / 8)
+ val updateDf1 =
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates1)),
structType)
+ val deletes1 = rowsToUpdate1.subList(recordsToUpdate / 8, recordsToUpdate
/ 4)
+ val deleteDf1 =
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes1)),
structType)
+ val batch1 = deleteDf1.withColumn("_hoodie_is_deleted",
lit(true)).union(updateDf1)
+ batch1.cache()
+
+ val updateToDiffPartitionHigherTs =
sqlContext.createDataFrame(toUpdate.subList(recordsToUpdate / 4,
recordsToUpdate / 2), structType)
+ val rowsToUpdate2 =
DataSourceTestUtils.updateRowsWithUpdatedTs(updateToDiffPartitionHigherTs,
false, true)
+ val updates2 = rowsToUpdate2.subList(0, recordsToUpdate / 8)
+ val updateDf2 =
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates2)),
structType)
+ val deletes2 = rowsToUpdate2.subList(recordsToUpdate / 8, recordsToUpdate
/ 4)
+ val deleteDf2 =
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes2)),
structType)
+ val batch2 = deleteDf2.withColumn("_hoodie_is_deleted",
lit(true)).union(updateDf2)
+ batch2.cache()
+
+ val updateToSamePartitionLowerTs =
sqlContext.createDataFrame(toUpdate.subList(recordsToUpdate / 2,
recordsToUpdate * 3 / 4), structType)
+ val rowsToUpdate3 =
DataSourceTestUtils.updateRowsWithUpdatedTs(updateToSamePartitionLowerTs, true,
false)
+ val updates3 = rowsToUpdate3.subList(0, recordsToUpdate / 8)
+ val updateDf3 =
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates3)),
structType)
+ val deletes3 = rowsToUpdate3.subList(recordsToUpdate / 8, recordsToUpdate
/ 4)
+ val deleteDf3 =
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes3)),
structType)
+ val batch3 = deleteDf3.withColumn("_hoodie_is_deleted",
lit(true)).union(updateDf3)
+ batch3.cache()
+
+ val updateToDiffPartitionLowerTs =
sqlContext.createDataFrame(toUpdate.subList(recordsToUpdate * 3 / 4,
recordsToUpdate), structType)
+ val rowsToUpdate4 =
DataSourceTestUtils.updateRowsWithUpdatedTs(updateToDiffPartitionLowerTs, true,
true)
+ val updates4 = rowsToUpdate4.subList(0, recordsToUpdate / 8)
+ val updateDf4 =
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates4)),
structType)
+ val deletes4 = rowsToUpdate4.subList(recordsToUpdate / 8, recordsToUpdate
/ 4)
+ val deleteDf4 =
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes4)),
structType)
+ val batch4 = deleteDf4.withColumn("_hoodie_is_deleted",
lit(true)).union(updateDf4)
+ batch4.cache()
+
+ batch1.union(batch2).union(batch3).union(batch4).write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
+ .mode(SaveMode.Append)
+ .save(basePath)
+ }
+
def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row],
beforeRows: List[Row], colsToCompare: String): Unit = {
val hudiWithoutMetaDf =
hudiDf.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD)
hudiWithoutMetaDf.registerTempTable("hudiTbl")
@@ -354,4 +480,7 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
assertEquals(hudiDf1ToCompare.count,
hudiDf1ToCompare.intersect(hudiDf2ToCompare).count)
assertEquals(0, hudiDf1ToCompare.except(hudiDf2ToCompare).count)
}
+
+ def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
+ asScalaIteratorConverter(inputList.iterator).asScala.toSeq
}