This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7943e1e18849 [HUDI-9188] Handle out of order deletes in Record Level
Index (#13830)
7943e1e18849 is described below
commit 7943e1e18849af9ae826675738de220005b876a9
Author: Tim Brown <[email protected]>
AuthorDate: Fri Sep 5 14:28:08 2025 -0400
[HUDI-9188] Handle out of order deletes in Record Level Index (#13830)
Co-authored-by: danny0405 <[email protected]>
---
.../org/apache/hudi/index/HoodieIndexUtils.java | 68 +++++++++----
.../hudi/index/simple/TestGlobalSimpleIndex.java | 1 +
.../index/bloom/TestHoodieGlobalBloomIndex.java | 1 +
.../table/read/TestHoodieFileGroupReaderBase.java | 14 +--
.../common/testutils/HoodieTestDataGenerator.java | 112 ++++++++++++++-------
.../source/TestStreamReadMonitoringFunction.java | 2 +-
.../table/TestHoodieFileGroupReaderOnFlink.java | 5 +-
.../hudi/functional/RecordLevelIndexTestBase.scala | 9 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 5 +-
.../apache/hudi/functional/TestMORDataSource.scala | 5 +-
.../TestPartitionedRecordLevelIndex.scala | 56 +++++++++--
.../hudi/functional/TestRecordLevelIndex.scala | 68 ++++++++++---
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 4 +-
.../TestHoodieMultiTableDeltaStreamer.java | 11 +-
.../testutils/sources/AbstractBaseTestSource.java | 8 +-
15 files changed, 260 insertions(+), 109 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index c90bbcb5172b..2ffe211a113a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.index;
import org.apache.hudi.avro.AvroSchemaCache;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
@@ -38,7 +39,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -505,10 +506,13 @@ public class HoodieIndexUtils {
/**
* Merge tagged incoming records with existing records in case of partition
path updated.
*/
- public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
- HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations,
- HoodieWriteConfig config,
- HoodieTable hoodieTable) {
+ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesAndDeletionsIfNeeded(HoodieData<Pair<HoodieRecord<R>,
+
Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations,
+
boolean shouldUpdatePartitionPath,
+
HoodieWriteConfig config,
+
HoodieTable hoodieTable,
+
HoodieReaderContext<R> readerContext,
+
SerializableSchema writerSchema) {
boolean isExpressionPayload =
config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload");
Pair<HoodieWriteConfig, BaseKeyGenerator> keyGeneratorWriteConfigOpt =
getKeygenAndUpdatedWriteConfig(config,
hoodieTable.getMetaClient().getTableConfig(), isExpressionPayload);
@@ -527,19 +531,14 @@ public class HoodieIndexUtils {
.map(p -> Pair.of(p.getRight().get().getPartitionPath(),
p.getRight().get().getFileId()))
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
// define the buffered record merger.
- ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
hoodieTable.getContext()
- .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(), config.getProps());
- HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
TypedProperties properties =
readerContext.getMergeProps(updatedConfig.getProps());
RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
- readerContext.initRecordMergerForIngestion(config.getProps());
// Create a reader context for the existing records. In the case of
merge-into commands, the incoming records
// can be using an expression payload so here we rely on the table's
configured payload class if it is required.
ReaderContextFactory<R> readerContextFactoryForExistingRecords =
(ReaderContextFactory<R>) hoodieTable.getContext()
.getReaderContextFactoryForWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(),
hoodieTable.getMetaClient().getTableConfig().getProps());
RecordContext<R> existingRecordContext =
readerContextFactoryForExistingRecords.getContext().getRecordContext();
// merged existing records with current locations being set
- SerializableSchema writerSchema = new
SerializableSchema(hoodieTable.getConfig().getWriteSchema());
SerializableSchema writerSchemaWithMetaFields = new
SerializableSchema(HoodieAvroUtils.addMetadataFields(writerSchema.get(),
updatedConfig.allowOperationMetadataField()));
AvroSchemaCache.intern(writerSchema.get());
AvroSchemaCache.intern(writerSchemaWithMetaFields.get());
@@ -570,10 +569,7 @@ public class HoodieIndexUtils {
}
HoodieRecord<R> existing = existingOpt.get();
Schema writeSchema = writerSchema.get();
- if (incoming.isDelete(writeSchema, properties)) {
- // incoming is a delete: force tag the incoming to the old
partition
- return
Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()),
existing.getCurrentLocation())).iterator();
- }
+
Option<HoodieRecord<R>> mergedOpt = mergeIncomingWithExistingRecord(
incoming, existing, writeSchema,
writerSchemaWithMetaFields.get(), updatedConfig,
recordMerger, keyGenerator, incomingRecordContext,
existingRecordContext, orderingFieldsArray, properties, isExpressionPayload);
@@ -589,16 +585,34 @@ public class HoodieIndexUtils {
if (Objects.equals(merged.getPartitionPath(),
existing.getPartitionPath())) {
// merged record has the same partition: route the merged result
to the current location as an update
return Collections.singletonList(tagRecord(merged,
existing.getCurrentLocation())).iterator();
- } else {
+ } else if (shouldUpdatePartitionPath) {
// merged record has a different partition: issue a delete to the
old partition and insert the merged record to the new partition
HoodieRecord<R> deleteRecord = createDeleteRecord(updatedConfig,
existing.getKey());
deleteRecord.setIgnoreIndexUpdate(true);
return Arrays.asList(tagRecord(deleteRecord,
existing.getCurrentLocation()), merged).iterator();
+ } else {
+ // merged record has a different partition path but
shouldUpdatePartitionPath is false, treat as an update to the existing location
+ return
Collections.singletonList(merged.newInstance(existing.getKey())).iterator();
}
});
return taggedUpdatingRecords.union(taggedNewRecords);
}
+ /**
+ * Tags the incoming records with their existing locations if any is found.
+ * Global indexing can support moving records across partitions. To do so,
the tagging logic will result in a delete to the old partition and an insert
into the new partition. When this happens
+ * with Merge-on-Read tables, the record key is present in multiple
locations until the file slices are compacted. If the index relies on reading
the keys directly from the file slices, then
+ * `mayContainDuplicateLookup` must be set to true to account for this. If
the lookup will only contain the latest location of the record key, then
`mayContainDuplicateLookup` is set to false.
+ * @param incomingRecords The new data being written to the table
+ * @param keyAndExistingLocations The existing locations of the records in
the table
+ * @param mayContainDuplicateLookup Set to true if the existing locations
may contain multiple entries for the same record key
+ * @param shouldUpdatePartitionPath When true, the index will handle
partition path updates by merging the incoming record with the existing record
and determining if the partition path has changed.
+ * When false, the incoming record will
always be tagged to the existing location's partition path.
+ * @param config the writer's configuration
+ * @param table the table that is being updated
+ * @return the incoming records tagged with their existing locations if any
is found
+ * @param <R> the type of the data in the record
+ */
public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
HoodieData<HoodieRecord<R>> incomingRecords,
HoodiePairData<String, HoodieRecordGlobalLocation>
keyAndExistingLocations,
@@ -606,11 +620,20 @@ public class HoodieIndexUtils {
boolean shouldUpdatePartitionPath,
HoodieWriteConfig config,
HoodieTable table) {
- final HoodieRecordMerger merger = config.getRecordMerger();
-
HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(),
record));
+ ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
table.getContext()
+ .getReaderContextFactoryForWrite(table.getMetaClient(),
config.getRecordMerger().getRecordType(), config.getProps());
+ HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+ readerContext.initRecordMergerForIngestion(config.getProps());
+ TypedProperties properties =
readerContext.getMergeProps(config.getProps());
+ SerializableSchema writerSchema = new
SerializableSchema(config.getWriteSchema());
+ boolean isCommitTimeOrdered = readerContext.getMergeMode() ==
RecordMergeMode.COMMIT_TIME_ORDERING;
+ // if the index is not updating the partition of the record, and the table
is COW, then we do not need to do merging at
+ // this phase since the writer path will merge when rewriting the files as
part of the upsert operation.
+ boolean requiresMergingWithOlderRecordVersion = shouldUpdatePartitionPath
|| table.getMetaClient().getTableConfig().getTableType() ==
HoodieTableType.MERGE_ON_READ;
+
// Pair of incoming record and the global location if meant for merged
lookup in later stage
HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations
= keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
@@ -619,9 +642,10 @@ public class HoodieIndexUtils {
Option<HoodieRecordGlobalLocation> currentLocOpt =
Option.ofNullable(v.getRight().orElse(null));
if (currentLocOpt.isPresent()) {
HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
- boolean shouldDoMergedLookUpThenTag = mayContainDuplicateLookup
- || !Objects.equals(incomingRecord.getPartitionPath(),
currentLoc.getPartitionPath());
- if (shouldUpdatePartitionPath && shouldDoMergedLookUpThenTag) {
+ boolean shouldDoMergedLookUpThenTag = mayContainDuplicateLookup //
handle event time ordering updates
+ || shouldUpdatePartitionPath &&
!Objects.equals(incomingRecord.getPartitionPath(),
currentLoc.getPartitionPath()) // handle partition updates
+ || (!isCommitTimeOrdered &&
incomingRecord.isDelete(writerSchema.get(), properties)); // handle event time
ordering deletes
+ if (requiresMergingWithOlderRecordVersion &&
shouldDoMergedLookUpThenTag) {
// the pair's right side is a non-empty Option, which indicates
that a merged lookup will be performed
// at a later stage.
return Pair.of(incomingRecord, currentLocOpt);
@@ -637,8 +661,8 @@ public class HoodieIndexUtils {
return Pair.of(incomingRecord, Option.empty());
}
});
- return shouldUpdatePartitionPath
- ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations,
config, table)
+ return requiresMergingWithOlderRecordVersion
+ ?
mergeForPartitionUpdatesAndDeletionsIfNeeded(incomingRecordsAndLocations,
shouldUpdatePartitionPath, config, table, readerContext, writerSchema)
: incomingRecordsAndLocations.map(Pair::getLeft);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
index 5eeb9575ef81..f27cdd7c8ccb 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
@@ -131,6 +131,7 @@ class TestGlobalSimpleIndex extends HoodieCommonTestHarness
{
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
+ .withSchema(SCHEMA.toString())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.fromProperties(props)
.withIndexType(HoodieIndex.IndexType.GLOBAL_SIMPLE)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index aa7ce1e4bef5..4a0217e8e8ea 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -217,6 +217,7 @@ public class TestHoodieGlobalBloomIndex extends
TestHoodieMetadataBase {
@Test
public void testTagLocation() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withSchema(SCHEMA.toString())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM)
.withGlobalBloomIndexUpdatePartitionPath(false)
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index a91de164f21c..e03b9b5285f1 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -214,6 +214,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// Initial commit. rider column gets value of rider-002
List<HoodieRecord> initialRecords = dataGen.generateInserts("002", 100);
+ long initialTs = (long) ((GenericRecord)
initialRecords.get(0).getData()).get("timestamp");
commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), true, 0, recordMergeMode,
@@ -222,7 +223,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
// The updates have rider values as rider-001 and the existing records
have rider values as rider-002
// timestamp is 0 for all records so will not be considered
// All updates in this batch will be ignored as rider values are smaller
and timestamp value is same
- List<HoodieRecord> updates = dataGen.generateUniqueUpdates("001", 5);
+ List<HoodieRecord> updates = dataGen.generateUniqueUpdates("001", 5,
initialTs);
List<HoodieRecord> allRecords = initialRecords;
List<HoodieRecord> unmergedRecords = CollectionUtils.combine(updates,
allRecords);
commitToTable(updates, UPSERT.value(), false, writeConfigs);
@@ -233,7 +234,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
// The updates have rider values as rider-003 and the existing records
have rider values as rider-002
// timestamp is 0 for all records so will not be considered
// All updates in this batch will reflect in the final records
- List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates("003", 10);
+ List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates("003", 10,
initialTs);
List<HoodieRecord> finalRecords = mergeRecordLists(updates2, allRecords);
commitToTable(updates2, UPSERT.value(), false, writeConfigs);
validateOutputFromFileGroupReader(
@@ -579,7 +580,9 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
public void testSpillableMapUsage(ExternalSpillableMap.DiskMapType
diskMapType) throws Exception {
Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(RecordMergeMode.COMMIT_TIME_ORDERING, true));
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
- commitToTable(dataGen.generateInserts("001", 100), INSERT.value(), true,
writeConfigs);
+ List<HoodieRecord> recordList = dataGen.generateInserts("001", 100);
+ long timestamp = (long) ((GenericRecord)
recordList.get(0).getData()).get("timestamp");
+ commitToTable(recordList, INSERT.value(), true, writeConfigs);
String baseMapPath = Files.createTempDirectory(null).toString();
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
@@ -616,8 +619,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
assertEquals(keyBased.getRecordKey(), recordKey);
assertEquals(positionBased.getRecordKey(), recordKey);
assertEquals(avroSchema,
readerContext.getRecordContext().getSchemaFromBufferRecord(keyBased));
- // generate field value is hardcoded as 0 for ordering field:
timestamp, see HoodieTestDataGenerator#generateRandomValue
-
assertEquals(readerContext.getRecordContext().convertValueToEngineType(0L),
positionBased.getOrderingValue());
+
assertEquals(readerContext.getRecordContext().convertValueToEngineType(timestamp),
positionBased.getOrderingValue());
}
}
}
@@ -628,7 +630,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
Map<String, String> configMapping = new HashMap<>();
configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
KEY_FIELD_NAME);
configMapping.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
PARTITION_FIELD_NAME);
- configMapping.put(HoodieTableConfig.ORDERING_FIELDS.key(),
ORDERING_FIELD_NAME);
+ configMapping.put(HoodieTableConfig.ORDERING_FIELDS.key(), recordMergeMode
!= RecordMergeMode.COMMIT_TIME_ORDERING ? ORDERING_FIELD_NAME : "");
configMapping.put("hoodie.payload.ordering.field", ORDERING_FIELD_NAME);
configMapping.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
configMapping.put("hoodie.insert.shuffle.parallelism", "4");
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 838acad02ecd..be13209c9165 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -314,19 +314,19 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
return numOfRecords * BYTES_PER_RECORD + BLOOM_FILTER_BYTES;
}
- public IndexedRecord generateRandomValueAsPerSchema(String schemaStr,
HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
+ public IndexedRecord generateRandomValueAsPerSchema(String schemaStr,
HoodieKey key, String commitTime, boolean isFlattened, long timestamp) throws
IOException {
if (TRIP_FLATTENED_SCHEMA.equals(schemaStr)) {
- return generateRandomValue(key, commitTime, true);
+ return generateRandomValue(key, commitTime, true, timestamp);
} else if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
- return generateRandomValue(key, commitTime, isFlattened);
+ return generateRandomValue(key, commitTime, isFlattened, timestamp);
} else if (TRIP_ENCODED_DECIMAL_SCHEMA.equals(schemaStr)) {
- return generatePayloadForTripEncodedDecimalSchema(key, commitTime);
+ return generatePayloadForTripEncodedDecimalSchema(key, commitTime,
timestamp);
} else if (TRIP_SCHEMA.equals(schemaStr)) {
- return generatePayloadForTripSchema(key, commitTime);
+ return generatePayloadForTripSchema(key, commitTime, timestamp);
} else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
- return generatePayloadForShortTripSchema(key, commitTime);
+ return generatePayloadForShortTripSchema(key, commitTime, timestamp);
} else if (TRIP_NESTED_EXAMPLE_SCHEMA.equals(schemaStr)) {
- return generateNestedExampleRandomValue(key, commitTime);
+ return generateNestedExampleRandomValue(key, commitTime, timestamp);
} else if
(TRIP_EXAMPLE_SCHEMA_WITH_PAYLOAD_SPECIFIC_COLS.equals(schemaStr)) {
return generateRandomValueWithColumnRequired(key, commitTime);
}
@@ -357,11 +357,11 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
* @throws IOException
*/
private IndexedRecord generateRandomValue(HoodieKey key, String instantTime,
boolean isFlattened) {
- return generateRandomValue(key, instantTime, isFlattened, 0);
+ return generateRandomValue(key, instantTime, isFlattened,
System.currentTimeMillis());
}
private IndexedRecord generateNestedExampleRandomValue(HoodieKey key, String
instantTime) {
- return generateNestedExampleRandomValue(key, instantTime, 0);
+ return generateNestedExampleRandomValue(key, instantTime,
System.currentTimeMillis());
}
private IndexedRecord generateRandomValue(HoodieKey key, String instantTime,
boolean isFlattened, long timestamp) {
@@ -370,7 +370,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
false, isFlattened);
}
- private IndexedRecord generateNestedExampleRandomValue(HoodieKey key, String
instantTime, int ts) {
+ private IndexedRecord generateNestedExampleRandomValue(HoodieKey key, String
instantTime, long ts) {
return generateNestedExampleGenericRecord(
key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime,
"driver-" + instantTime, ts,
false);
@@ -379,34 +379,34 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
/**
* Generates a new avro record with TRIP_ENCODED_DECIMAL_SCHEMA, retaining
the key if optionally provided.
*/
- public IndexedRecord generatePayloadForTripEncodedDecimalSchema(HoodieKey
key, String commitTime) {
- return generateRecordForTripEncodedDecimalSchema(key.getRecordKey(),
"rider-" + commitTime, "driver-" + commitTime, 0);
+ public IndexedRecord generatePayloadForTripEncodedDecimalSchema(HoodieKey
key, String commitTime, long timestamp) {
+ return generateRecordForTripEncodedDecimalSchema(key.getRecordKey(),
"rider-" + commitTime, "driver-" + commitTime, timestamp);
}
/**
* Generates a new avro record with TRIP_SCHEMA, retaining the key if
optionally provided.
*/
- public IndexedRecord generatePayloadForTripSchema(HoodieKey key, String
commitTime) {
- return generateRecordForTripSchema(key.getRecordKey(), "rider-" +
commitTime, "driver-" + commitTime, 0);
+ public IndexedRecord generatePayloadForTripSchema(HoodieKey key, String
commitTime, long timestamp) {
+ return generateRecordForTripSchema(key.getRecordKey(), "rider-" +
commitTime, "driver-" + commitTime, timestamp);
}
- public IndexedRecord generatePayloadForShortTripSchema(HoodieKey key, String
commitTime) {
- return generateRecordForShortTripSchema(key.getRecordKey(), "rider-" +
commitTime, "driver-" + commitTime, 0);
+ public IndexedRecord generatePayloadForShortTripSchema(HoodieKey key, String
commitTime, long timestamp) {
+ return generateRecordForShortTripSchema(key.getRecordKey(), "rider-" +
commitTime, "driver-" + commitTime, timestamp);
}
/**
* Generates a new avro record of the above schema format for a delete.
*/
- private IndexedRecord generateRandomDeleteValue(HoodieKey key, String
instantTime) throws IOException {
- return generateGenericRecord(key.getRecordKey(), key.getPartitionPath(),
"rider-" + instantTime, "driver-" + instantTime, 0,
+ private IndexedRecord generateRandomDeleteValue(HoodieKey key, String
instantTime, long timestamp) throws IOException {
+ return generateGenericRecord(key.getRecordKey(), key.getPartitionPath(),
"rider-" + instantTime, "driver-" + instantTime, timestamp,
true, false);
}
/**
* Generates a new avro record of the above schema format, retaining the key
if optionally provided.
*/
- private IndexedRecord generateAvroPayload(HoodieKey key, String instantTime)
{
- return generateGenericRecord(key.getRecordKey(), key.getPartitionPath(),
"rider-" + instantTime, "driver-" + instantTime, 0);
+ private IndexedRecord generateAvroPayload(HoodieKey key, String instantTime,
long timestamp) {
+ return generateGenericRecord(key.getRecordKey(), key.getPartitionPath(),
"rider-" + instantTime, "driver-" + instantTime, timestamp);
}
public GenericRecord generateGenericRecord(String rowKey, String
partitionPath, String riderName, String driverName,
@@ -785,7 +785,11 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
}
public List<HoodieRecord> generateInsertsAsPerSchema(String commitTime,
Integer n, String schemaStr) {
- return generateInsertsStream(commitTime, n, false,
schemaStr).collect(Collectors.toList());
+ return generateInsertsStream(commitTime, n, false, schemaStr,
System.currentTimeMillis()).collect(Collectors.toList());
+ }
+
+ public List<HoodieRecord> generateInsertsAsPerSchema(String commitTime,
Integer n, String schemaStr, long timestamp) {
+ return generateInsertsStream(commitTime, n, false, schemaStr,
timestamp).collect(Collectors.toList());
}
/**
@@ -796,8 +800,12 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
return generateInserts(instantTime, n, false);
}
+ public List<HoodieRecord> generateInserts(String instantTime, Integer n,
long timestamp) {
+ return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA,
timestamp).collect(Collectors.toList());
+ }
+
public List<HoodieRecord> generateInsertsNestedExample(String instantTime,
Integer n) {
- return generateInsertsStream(instantTime, n, false,
TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList());
+ return generateInsertsStream(instantTime, n, false,
TRIP_NESTED_EXAMPLE_SCHEMA,
System.currentTimeMillis()).collect(Collectors.toList());
}
/**
@@ -810,32 +818,35 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
* @return List of {@link HoodieRecord}s
*/
public List<HoodieRecord> generateInserts(String instantTime, Integer n,
boolean isFlattened) {
- return generateInsertsStream(instantTime, n, isFlattened, isFlattened ?
TRIP_FLATTENED_SCHEMA : TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
+ return generateInsertsStream(instantTime, n, isFlattened, isFlattened ?
TRIP_FLATTENED_SCHEMA : TRIP_EXAMPLE_SCHEMA,
System.currentTimeMillis()).collect(Collectors.toList());
}
/**
* Generates new inserts, uniformly across the partition paths above. It
also updates the list of existing keys.
*/
- public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer
n, boolean isFlattened, String schemaStr) {
- return generateInsertsStream(commitTime, n, isFlattened, schemaStr, false);
+ public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer
n, boolean isFlattened, String schemaStr, long timestamp) {
+ return generateInsertsStream(commitTime, n, isFlattened, schemaStr, false,
timestamp);
}
public List<HoodieRecord> generateInsertsContainsAllPartitions(String
instantTime, Integer n) {
if (n < partitionPaths.length) {
throw new HoodieIOException("n must greater then partitionPaths length");
}
- return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA,
true).collect(Collectors.toList());
+ long timestamp = System.currentTimeMillis();
+ return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA,
true, timestamp).collect(Collectors.toList());
}
public List<HoodieRecord> generateInsertsForPartitionPerSchema(String
instantTime, Integer n, String partition, String schemaStr) {
- return generateInsertsStream(instantTime, n, false, schemaStr, false, ()
-> partition, () ->
genPseudoRandomUUID(rand).toString()).collect(Collectors.toList());
+ long timestamp = System.currentTimeMillis();
+ return generateInsertsStream(instantTime, n, false, schemaStr, false, ()
-> partition, () -> genPseudoRandomUUID(rand).toString(),
timestamp).collect(Collectors.toList());
}
public List<HoodieRecord> generateInsertsForPartition(String instantTime,
Integer n, String partition) {
- return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA,
false, () -> partition, () ->
genPseudoRandomUUID(rand).toString()).collect(Collectors.toList());
+ long timestamp = System.currentTimeMillis();
+ return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA,
false, () -> partition, () -> genPseudoRandomUUID(rand).toString(),
timestamp).collect(Collectors.toList());
}
- public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer
n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
+ public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer
n, boolean isFlattened, String schemaStr, boolean containsAllPartitions, long
timestamp) {
AtomicInteger partitionIndex = new AtomicInteger(0);
return generateInsertsStream(commitTime, n, isFlattened, schemaStr,
containsAllPartitions,
() -> {
@@ -844,14 +855,15 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
partitionIndex.set((partitionIndex.get() + 1) %
partitionPaths.length);
return partitionToUse;
},
- () -> genPseudoRandomUUID(rand).toString());
+ () -> genPseudoRandomUUID(rand).toString(),
+ timestamp);
}
/**
* Generates new inserts, uniformly across the partition paths above. It
also updates the list of existing keys.
*/
public Stream<HoodieRecord> generateInsertsStream(String instantTime,
Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions,
- Supplier<String>
partitionPathSupplier, Supplier<String> recordKeySupplier) {
+ Supplier<String>
partitionPathSupplier, Supplier<String> recordKeySupplier, long timestamp) {
int currSize = getNumExistingKeys(schemaStr);
return IntStream.range(0, n).boxed().map(i -> {
String partitionPath = partitionPathSupplier.get();
@@ -865,7 +877,7 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
populateKeysBySchema(schemaStr, currSize + i, kp);
incrementNumExistingKeysBySchema(schemaStr);
try {
- return new HoodieAvroIndexedRecord(key,
generateRandomValueAsPerSchema(schemaStr, key, instantTime, isFlattened),
+ return new HoodieAvroIndexedRecord(key,
generateRandomValueAsPerSchema(schemaStr, key, instantTime, isFlattened,
timestamp),
null,
Option.of(Collections.singletonMap("InputRecordCount_1506582000", "2")));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
@@ -905,8 +917,9 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
public List<HoodieRecord> generateUpdatesWithHoodieAvroPayload(String
instantTime, List<HoodieRecord> baseRecords) {
List<HoodieRecord> updates = new ArrayList<>();
+ long timestamp = System.currentTimeMillis();
for (HoodieRecord baseRecord : baseRecords) {
- HoodieRecord record = new HoodieAvroIndexedRecord(baseRecord.getKey(),
generateAvroPayload(baseRecord.getKey(), instantTime));
+ HoodieRecord record = new HoodieAvroIndexedRecord(baseRecord.getKey(),
generateAvroPayload(baseRecord.getKey(), instantTime, timestamp));
updates.add(record);
}
return updates;
@@ -1031,10 +1044,18 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
return generateUniqueUpdatesStream(instantTime, n,
TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
}
+ public List<HoodieRecord> generateUniqueUpdates(String instantTime, Integer
n, long timestamp) {
+ return generateUniqueUpdatesStream(instantTime, n, TRIP_EXAMPLE_SCHEMA,
timestamp).collect(Collectors.toList());
+ }
+
public List<HoodieRecord> generateUniqueUpdates(String instantTime, Integer
n, String schemaStr) {
return generateUniqueUpdatesStream(instantTime, n,
schemaStr).collect(Collectors.toList());
}
+ public List<HoodieRecord> generateUniqueUpdates(String instantTime, Integer
n, String schemaStr, long timestamp) {
+ return generateUniqueUpdatesStream(instantTime, n, schemaStr,
timestamp).collect(Collectors.toList());
+ }
+
public List<HoodieRecord> generateUniqueUpdatesNestedExample(String
instantTime, Integer n) {
return generateUniqueUpdatesStream(instantTime, n,
TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList());
}
@@ -1061,6 +1082,11 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
* @return stream of hoodie record updates
*/
public Stream<HoodieRecord> generateUniqueUpdatesStream(String instantTime,
Integer n, String schemaStr) {
+ long timestamp = System.currentTimeMillis();
+ return generateUniqueUpdatesStream(instantTime, n, schemaStr, timestamp);
+ }
+
+ public Stream<HoodieRecord> generateUniqueUpdatesStream(String instantTime,
Integer n, String schemaStr, long timestamp) {
final Set<KeyPartition> used = new HashSet<>();
int numExistingKeys = numKeysBySchema.getOrDefault(schemaStr, 0);
Map<Integer, KeyPartition> existingKeys =
existingKeysBySchema.get(schemaStr);
@@ -1079,7 +1105,7 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
logger.debug("key getting updated: {}", kp.key.getRecordKey());
used.add(kp);
try {
- return new HoodieAvroIndexedRecord(kp.key,
generateRandomValueAsPerSchema(schemaStr, kp.key, instantTime, false));
+ return new HoodieAvroIndexedRecord(kp.key,
generateRandomValueAsPerSchema(schemaStr, kp.key, instantTime, false,
timestamp));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
@@ -1122,9 +1148,11 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
*
* @param instantTime Commit Timestamp
* @param n Number of unique records
+ * @param updatePartition whether to update the partition path while
generating delete record
+ * @param timestamp timestamp to set in the record for the ordering value
* @return stream of hoodie records for delete
*/
- public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String
instantTime, Integer n, boolean updatePartition) {
+ private Stream<HoodieRecord> generateUniqueDeleteRecordStream(String
instantTime, Integer n, boolean updatePartition, long timestamp) {
final Set<KeyPartition> used = new HashSet<>();
Map<Integer, KeyPartition> existingKeys =
existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
@@ -1151,7 +1179,7 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
key = new HoodieKey(key.getRecordKey(), updatedPartitionPath);
}
try {
- result.add(new HoodieAvroIndexedRecord(key,
generateRandomDeleteValue(key, instantTime)));
+ result.add(new HoodieAvroIndexedRecord(key,
generateRandomDeleteValue(key, instantTime, timestamp)));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
@@ -1168,11 +1196,19 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
* @return List of hoodie records for delete
*/
public List<HoodieRecord> generateUniqueDeleteRecords(String instantTime,
Integer n) {
- return generateUniqueDeleteRecordStream(instantTime, n,
false).collect(Collectors.toList());
+ return generateUniqueDeleteRecordStream(instantTime, n, false,
System.currentTimeMillis()).collect(Collectors.toList());
}
public List<HoodieRecord>
generateUniqueDeleteRecordsWithUpdatedPartition(String instantTime, Integer n) {
- return generateUniqueDeleteRecordStream(instantTime, n,
true).collect(Collectors.toList());
+ return generateUniqueDeleteRecordStream(instantTime, n, true,
System.currentTimeMillis()).collect(Collectors.toList());
+ }
+
+ public List<HoodieRecord> generateUniqueDeleteRecords(String instantTime,
Integer n, long timestamp) {
+ return generateUniqueDeleteRecordStream(instantTime, n, false,
timestamp).collect(Collectors.toList());
+ }
+
+ public List<HoodieRecord>
generateUniqueDeleteRecordsWithUpdatedPartition(String instantTime, Integer n,
long timestamp) {
+ return generateUniqueDeleteRecordStream(instantTime, n, true,
timestamp).collect(Collectors.toList());
}
public boolean deleteExistingKeyIfPresent(HoodieKey key) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index fa2cce1cd347..c78292f4b45e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -64,7 +64,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Test cases for {@link StreamReadMonitoringFunction}.
*/
public class TestStreamReadMonitoringFunction {
- private static final long WAIT_TIME_MILLIS = 5 * 1000L;
+ private static final long WAIT_TIME_MILLIS = 10 * 1000L;
private Configuration conf;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index a3c70eab57b2..681442274893 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -281,9 +281,10 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
// One commit; reading one file group containing a log file only
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
commitToTable(initialRecords, UPSERT.value(), true, writeConfigs,
TRIP_EXAMPLE_SCHEMA);
+ String[] orderingFields = recordMergeMode ==
RecordMergeMode.COMMIT_TIME_ORDERING ? new String[0] : new
String[]{ORDERING_FIELD_NAME};
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 1, recordMergeMode,
- initialRecords, initialRecords, new String[]{ORDERING_FIELD_NAME});
+ initialRecords, initialRecords, orderingFields);
// Two commits; reading one file group containing two log files
List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
@@ -291,7 +292,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
commitToTable(updates, UPSERT.value(), false, writeConfigs,
TRIP_EXAMPLE_SCHEMA);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 2, recordMergeMode,
- allRecords, CollectionUtils.combine(initialRecords, updates), new
String[] {ORDERING_FIELD_NAME});
+ allRecords, CollectionUtils.combine(initialRecords, updates),
orderingFields);
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 05a31fad7c3a..d5a6cf995f1c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -72,20 +72,21 @@ class RecordLevelIndexTestBase extends
HoodieStatsIndexTestBase {
validate: Boolean = true,
numUpdates: Int = 1,
onlyUpdates: Boolean =
false,
- schemaStr: String =
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA): DataFrame = {
+ schemaStr: String =
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
+ timestamp: Long =
System.currentTimeMillis()): DataFrame = {
var latestBatch: mutable.Buffer[String] = null
if (operation == UPSERT_OPERATION_OPT_VAL) {
val instantTime = getInstantTime()
- val records =
recordsToStrings(dataGen.generateUniqueUpdates(instantTime, numUpdates,
schemaStr))
+ val records =
recordsToStrings(dataGen.generateUniqueUpdates(instantTime, numUpdates,
schemaStr, timestamp))
if (!onlyUpdates) {
-
records.addAll(recordsToStrings(dataGen.generateInsertsAsPerSchema(instantTime,
1, schemaStr)))
+
records.addAll(recordsToStrings(dataGen.generateInsertsAsPerSchema(instantTime,
1, schemaStr, timestamp)))
}
latestBatch = records.asScala
} else if (operation == INSERT_OVERWRITE_OPERATION_OPT_VAL) {
latestBatch =
recordsToStrings(dataGen.generateInsertsForPartitionPerSchema(
getInstantTime(), 5, dataGen.getPartitionPaths.last,
schemaStr)).asScala
} else {
- latestBatch =
recordsToStrings(dataGen.generateInsertsAsPerSchema(getInstantTime(), 5,
schemaStr)).asScala
+ latestBatch =
recordsToStrings(dataGen.generateInsertsAsPerSchema(getInstantTime(), 5,
schemaStr, timestamp)).asScala
}
val latestBatchDf =
spark.read.json(spark.sparkContext.parallelize(latestBatch.toSeq, 2))
latestBatchDf.cache()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 7e84fc0bc978..f49284704fbb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -185,6 +185,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
// Insert Operation
var records = recordsToStrings(dataGen.generateInserts("003",
100)).asScala.toList
var inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF = inputDF.withColumn("timestamp", lit(10))
val commonOptsWithMultipleOrderingFields = writeOpts ++ Map(
"hoodie.insert.shuffle.parallelism" -> "4",
@@ -203,6 +204,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
records = recordsToStrings(dataGen.generateUniqueUpdates("002",
10)).asScala.toList
inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF = inputDF.withColumn("timestamp", lit(10))
inputDF.write.format("hudi")
.options(commonOptsWithMultipleOrderingFields)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
@@ -212,6 +214,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
records = recordsToStrings(dataGen.generateUniqueUpdates("004",
10)).asScala.toList
inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF = inputDF.withColumn("timestamp", lit(10))
inputDF.write.format("hudi")
.options(commonOptsWithMultipleOrderingFields)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
@@ -221,7 +224,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
records = recordsToStrings(dataGen.generateUniqueUpdates("001",
10)).asScala.toList
inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
- inputDF = inputDF.withColumn("timestamp", lit(1))
+ inputDF = inputDF.withColumn("timestamp", lit(20))
inputDF.write.format("hudi")
.options(commonOptsWithMultipleOrderingFields)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 7a9263ce58de..4fd01a58ecc0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1776,6 +1776,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
// Insert Operation
var records = recordsToStrings(dataGen.generateInserts("003",
100)).asScala.toList
var inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF = inputDF.withColumn("timestamp", lit(10))
val commonOptsWithMultipleOrderingFields = writeOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
@@ -1795,6 +1796,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
records = recordsToStrings(dataGen.generateUniqueUpdates("002",
10)).asScala.toList
inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF = inputDF.withColumn("timestamp", lit(10))
inputDF.write.format("hudi")
.options(commonOptsWithMultipleOrderingFields)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
@@ -1804,6 +1806,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
records = recordsToStrings(dataGen.generateUniqueUpdates("004",
10)).asScala.toList
inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF = inputDF.withColumn("timestamp", lit(10))
inputDF.write.format("hudi")
.options(commonOptsWithMultipleOrderingFields)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
@@ -1813,7 +1816,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
records = recordsToStrings(dataGen.generateUniqueUpdates("001",
10)).asScala.toList
inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
- inputDF = inputDF.withColumn("timestamp", lit(1))
+ inputDF = inputDF.withColumn("timestamp", lit(20))
inputDF.write.format("hudi")
.options(commonOptsWithMultipleOrderingFields)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionedRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionedRecordLevelIndex.scala
index 776e27381703..7dfcb7c56c99 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionedRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionedRecordLevelIndex.scala
@@ -26,9 +26,9 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.data.HoodieListData
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecordGlobalLocation,
HoodieTableType}
+import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordGlobalLocation,
HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver}
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
InProcessTimeGenerator}
import
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
@@ -43,8 +43,9 @@ import org.apache.spark.sql.functions.lit
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertTrue, fail}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource}
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource,
ValueSource}
+import java.util
import java.util.stream.Collectors
import scala.collection.JavaConverters
@@ -76,7 +77,7 @@ class TestPartitionedRecordLevelIndex extends
RecordLevelIndexTestBase {
HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
HoodieIndexConfig.INDEX_TYPE.key() -> PARTITIONED_RECORD_INDEX.name())
holder.options = options
- insertDf.write.format("org.apache.hudi")
+ insertDf.write.format("hudi")
.options(options)
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -99,11 +100,13 @@ class TestPartitionedRecordLevelIndex extends
RecordLevelIndexTestBase {
val newDeletes = dataGen.generateUpdates("004", 1)
val updates = dataGen.generateUniqueUpdates("002", 3)
+ val lowerOrderingValue = 1L
+ updates.addAll(dataGen.generateUniqueDeleteRecords("002", 2,
lowerOrderingValue))
val nextBatch = recordsToStrings(updates).asScala.toSeq
val nextBatchDf =
spark.read.json(spark.sparkContext.parallelize(nextBatch, 1))
val updateDf = nextBatchDf.withColumn("data_partition_path",
lit("partition1"))
- updateDf.write.format("org.apache.hudi")
+ updateDf.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key(), UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
@@ -121,7 +124,7 @@ class TestPartitionedRecordLevelIndex extends
RecordLevelIndexTestBase {
val newInsertBatch = recordsToStrings(newInserts).asScala.toSeq
val newInsertBatchDf =
spark.read.json(spark.sparkContext.parallelize(newInsertBatch, 1))
val newInsertDf = newInsertBatchDf.withColumn("data_partition_path",
lit("partition2")).union(newInsertBatchDf.withColumn("data_partition_path",
lit("partition3")))
- newInsertDf.write.format("org.apache.hudi")
+ newInsertDf.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key(), UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
@@ -149,7 +152,7 @@ class TestPartitionedRecordLevelIndex extends
RecordLevelIndexTestBase {
val newDeletesBatch = recordsToStrings(newDeletes).asScala.toSeq
val newDeletesBatchDf =
spark.read.json(spark.sparkContext.parallelize(newDeletesBatch, 1))
val newDeletesDf = newDeletesBatchDf.withColumn("data_partition_path",
lit("partition1"))
- newDeletesDf.write.format("org.apache.hudi")
+ newDeletesDf.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key(), DELETE_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
@@ -181,7 +184,7 @@ class TestPartitionedRecordLevelIndex extends
RecordLevelIndexTestBase {
val bulkInsertPartitionedDf =
bulkInsertDf.withColumn("data_partition_path", lit("partition0"))
// Use bulk_insert operation explicitly
- bulkInsertPartitionedDf.write.format("org.apache.hudi")
+ bulkInsertPartitionedDf.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
@@ -348,7 +351,7 @@ class TestPartitionedRecordLevelIndex extends
RecordLevelIndexTestBase {
HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key() ->
streamingWriteEnabled.toString,
HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
HoodieIndexConfig.INDEX_TYPE.key() -> PARTITIONED_RECORD_INDEX.name())
- insertDf.write.format("org.apache.hudi")
+ insertDf.write.format("hudi")
.options(options)
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -359,7 +362,7 @@ class TestPartitionedRecordLevelIndex extends
RecordLevelIndexTestBase {
val nextBatch = recordsToStrings(updates).asScala.toSeq
val nextBatchDf =
spark.read.json(spark.sparkContext.parallelize(nextBatch, 1))
val updateDf = nextBatchDf.withColumn("data_partition_path",
lit("partition1"))
- updateDf.write.format("org.apache.hudi")
+ updateDf.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key(), UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
@@ -374,7 +377,7 @@ class TestPartitionedRecordLevelIndex extends
RecordLevelIndexTestBase {
val batchToFail = recordsToStrings(updatesToFail).asScala.toSeq
val batchToFailDf =
spark.read.json(spark.sparkContext.parallelize(batchToFail, 1))
val failDf = batchToFailDf.withColumn("data_partition_path",
lit("partition1")).union(batchToFailDf.withColumn("data_partition_path",
lit("partition3")))
- failDf.write.format("org.apache.hudi")
+ failDf.write.format("hudi")
.options(options)
.option(DataSourceWriteOptions.OPERATION.key(),
UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
@@ -444,6 +447,37 @@ class TestPartitionedRecordLevelIndex extends
RecordLevelIndexTestBase {
metadata.readRecordIndexLocationsWithKeys(HoodieListData.eager(recordKeys),
dataTablePartition)
.collectAsList().asScala.map(p => p.getKey -> p.getValue).toMap
}
+
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testRLIForDeletesWithHoodieIsDeletedColumn(tableType: HoodieTableType):
Unit = {
+ val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key ->
tableType.name()) +
+ (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") +
+ (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key ->
"false")
+ val insertDf = doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+ insertDf.cache()
+
+ val instantTime = InProcessTimeGenerator.createNewInstantTime()
+ // Issue two deletes, where one has an older ordering value that should be
ignored
+ val deletedRecords = dataGen.generateUniqueDeleteRecords(instantTime, 1)
+ val inputRecords = new util.ArrayList[HoodieRecord[_]](deletedRecords)
+ val lowerOrderingValue = 1L
+ inputRecords.addAll(dataGen.generateUniqueDeleteRecords(instantTime, 1,
lowerOrderingValue))
+ val deleteBatch = recordsToStrings(inputRecords).asScala
+ val deleteDf =
spark.read.json(spark.sparkContext.parallelize(deleteBatch.toSeq, 1))
+ deleteDf.cache()
+ val recordKeyToDelete =
deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String]
+ deleteDf.write.format("hudi")
+ .options(hudiOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val prevDf = mergedDfList.last
+ mergedDfList = mergedDfList :+ prevDf.filter(row =>
row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete)
+ validateDataAndRecordIndices(hudiOpts,
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(deletedRecords).asScala.toSeq,
1)))
+ deleteDf.unpersist()
+ }
}
object TestPartitionedRecordLevelIndex {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 95a06b3f9484..93e3fcda891b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
import
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model._
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
InProcessTimeGenerator}
import
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
@@ -47,7 +47,6 @@ import org.junit.jupiter.params.provider.Arguments.arguments
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.Executors
-import java.util.stream.Collectors
import scala.collection.JavaConverters._
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -85,7 +84,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
var operation = INSERT_OPERATION_OPT_VAL
val latestBatchDf =
spark.read.json(spark.sparkContext.parallelize(latestBatch, 1))
latestBatchDf.cache()
- latestBatchDf.write.format("org.apache.hudi")
+ latestBatchDf.write.format("hudi")
.options(hudiOpts)
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -103,7 +102,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
val latestBatchDf2_3 =
spark.read.json(spark.sparkContext.parallelize(latestBatch2_3, 1))
val latestBatchDf2Final = latestBatchDf2_3.union(latestBatchDf2_2)
latestBatchDf2Final.cache()
- latestBatchDf2Final.write.format("org.apache.hudi")
+ latestBatchDf2Final.write.format("hudi")
.options(hudiOpts)
.mode(SaveMode.Append)
.save(basePath)
@@ -123,14 +122,16 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase {
val latestBatch3 =
recordsToStrings(dataGen2.generateUniqueUpdates(instantTime3, 2)).asScala.toSeq
val latestBatchDf3 =
spark.read.json(spark.sparkContext.parallelize(latestBatch3, 1))
latestBatchDf3.cache()
- latestBatchDf.write.format("org.apache.hudi")
+ latestBatchDf3.write.format("hudi")
.options(hudiOpts2)
.mode(SaveMode.Append)
.save(basePath)
- val deletedDf3 = calculateMergedDf(latestBatchDf, operation, true)
+ val deletedDf3 = calculateMergedDf(latestBatchDf3, operation, true)
deletedDf3.cache()
metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
validateDataAndRecordIndices(hudiOpts, deletedDf3)
+ deletedDf2.unpersist()
+ deletedDf3.unpersist()
}
private def getNewInstantTime(): String = {
@@ -248,7 +249,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite)
val deleteDf = insertDf.limit(1)
- deleteDf.write.format("org.apache.hudi")
+ deleteDf.write.format("hudi")
.options(hudiOpts)
.option(DataSourceWriteOptions.OPERATION.key, DELETE_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
@@ -291,15 +292,54 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase {
insertDf.cache()
val instantTime = getNewInstantTime
- // Issue two deletes, one with the original partition and one with an
updated partition
- val recordsToDelete = dataGen.generateUniqueDeleteRecords(instantTime, 1)
-
recordsToDelete.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
1))
- val deleteBatch = recordsToStrings(recordsToDelete).asScala
+ // Issue four deletes, one with the original partition, one with an
updated partition,
+ // and two with an older ordering value that should be ignored
+ val deletedRecords = dataGen.generateUniqueDeleteRecords(instantTime, 1)
+
deletedRecords.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
1))
+ val inputRecords = new util.ArrayList[HoodieRecord[_]](deletedRecords)
+ val lowerOrderingValue = 1L
+ inputRecords.addAll(dataGen.generateUniqueDeleteRecords(instantTime, 1,
lowerOrderingValue))
+
inputRecords.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
1, lowerOrderingValue))
+ val deleteBatch = recordsToStrings(inputRecords).asScala
val deleteDf =
spark.read.json(spark.sparkContext.parallelize(deleteBatch.toSeq, 1))
deleteDf.cache()
val recordKeyToDelete1 =
deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String]
val recordKeyToDelete2 =
deleteDf.collectAsList().get(1).getAs("_row_key").asInstanceOf[String]
- deleteDf.write.format("org.apache.hudi")
+ deleteDf.write.format("hudi")
+ .options(hudiOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val prevDf = mergedDfList.last
+ mergedDfList = mergedDfList :+ prevDf.filter(row =>
row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete1 &&
+ row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete2)
+ validateDataAndRecordIndices(hudiOpts,
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(deletedRecords).asScala.toSeq,
1)))
+ deleteDf.unpersist()
+ }
+
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testRLIForDeletesWithCommitTimeOrdering(tableType: HoodieTableType):
Unit = {
+ val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key ->
tableType.name()) +
+ (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") +
+ (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key ->
"true") +
+ (HoodieTableConfig.ORDERING_FIELDS.key -> "")
+ val insertDf = doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+ insertDf.cache()
+
+ val instantTime = getNewInstantTime
+ val lowerOrderingValue = 1L
+ // Issue two deletes, one with the original partition, one with an updated
partition,
+ // Both have an older ordering value but that is ignored since the table
uses commit time ordering
+ val deletedRecords = dataGen.generateUniqueDeleteRecords(instantTime, 1,
lowerOrderingValue)
+
deletedRecords.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
1, lowerOrderingValue))
+ val deleteBatch = recordsToStrings(deletedRecords).asScala
+ val deleteDf =
spark.read.json(spark.sparkContext.parallelize(deleteBatch.toSeq, 1))
+ deleteDf.cache()
+ val recordKeyToDelete1 =
deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String]
+ val recordKeyToDelete2 =
deleteDf.collectAsList().get(1).getAs("_row_key").asInstanceOf[String]
+ deleteDf.write.format("hudi")
.options(hudiOpts)
.mode(SaveMode.Append)
.save(basePath)
@@ -658,13 +698,15 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase {
val executor = Executors.newFixedThreadPool(2)
implicit val executorContext: ExecutionContext =
ExecutionContext.fromExecutor(executor)
+ val timestamp = System.currentTimeMillis()
val function = new Function0[Boolean] {
override def apply(): Boolean = {
try {
doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
- validate = false)
+ validate = false,
+ timestamp = timestamp)
true
} catch {
case _: HoodieWriteConflictException => false
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index a5a72e395aa7..7edd11804c91 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -358,11 +358,11 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
HoodieTestDataGenerator dataGenerator = new
HoodieTestDataGenerator(makeDatesAmbiguous);
if (useCustomSchema) {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
- dataGenerator.generateInsertsAsPerSchema("000", numRecords,
schemaStr),
+ dataGenerator.generateInsertsAsPerSchema("000", numRecords,
schemaStr, 0L),
schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
} else {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
- dataGenerator.generateInserts("000", numRecords)), new Path(path));
+ dataGenerator.generateInserts("000", numRecords, 0L)), new
Path(path));
}
return dataGenerator;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index 0600d28a5dae..fc75e76275af 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -159,8 +160,8 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
testUtils.createTopic(topicName2, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5,
HoodieTestDataGenerator.TRIP_SCHEMA)));
- testUtils.sendMessages(topicName2,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10,
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+ testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5,
HoodieTestDataGenerator.TRIP_SCHEMA, 0L)));
+ testUtils.sendMessages(topicName2,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10,
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA, 0L)));
HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config",
JsonKafkaSource.class.getName(), false, false, null);
HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg, jsc);
@@ -185,8 +186,10 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
assertRecordCount(10, targetBasePath2, sqlContext);
//insert updates for already existing records in kafka topics
- testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5,
HoodieTestDataGenerator.TRIP_SCHEMA)));
- testUtils.sendMessages(topicName2,
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10,
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+ testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateUniqueUpdatesStream("001", 5,
HoodieTestDataGenerator.TRIP_SCHEMA, 0L)
+ .collect(Collectors.toList())));
+ testUtils.sendMessages(topicName2,
Helpers.jsonifyRecords(dataGenerator.generateUniqueUpdatesStream("001", 10,
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA, 0L)
+ .collect(Collectors.toList())));
streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
streamer.getTableExecutionContexts().get(1).setProperties(properties);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
index 777c7f237854..cbe823dd07f0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
@@ -137,15 +137,15 @@ public abstract class AbstractBaseTestSource extends
AvroSource {
if (!reachedMax && numUpdates >= 50) {
LOG.info("After adjustments => NumInserts={}, NumUpdates={},
NumDeletes=50, maxUniqueRecords={}", numInserts, (numUpdates - 50),
maxUniqueKeys);
// if we generate update followed by deletes -> some keys in update
batch might be picked up for deletes. Hence generating delete batch followed by
updates
- deleteStream =
dataGenerator.generateUniqueDeleteRecordStream(instantTime, 50,
false).map(AbstractBaseTestSource::toGenericRecord);
- updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime,
numUpdates - 50, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ deleteStream = dataGenerator.generateUniqueDeleteRecords(instantTime,
50, 0L).stream().map(AbstractBaseTestSource::toGenericRecord);
+ updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime,
numUpdates - 50, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 0L)
.map(AbstractBaseTestSource::toGenericRecord);
} else {
LOG.info("After adjustments => NumInserts={}, NumUpdates={},
maxUniqueRecords={}", numInserts, numUpdates, maxUniqueKeys);
- updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime,
numUpdates, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime,
numUpdates, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 0L)
.map(AbstractBaseTestSource::toGenericRecord);
}
- Stream<GenericRecord> insertStream =
dataGenerator.generateInsertsStream(instantTime, numInserts, false,
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ Stream<GenericRecord> insertStream =
dataGenerator.generateInsertsStream(instantTime, numInserts, false,
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 0L)
.map(AbstractBaseTestSource::toGenericRecord);
if
(Boolean.valueOf(props.getOrDefault("hoodie.test.source.generate.inserts",
"false").toString())) {
return insertStream;