This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1bdba3600a7 [HUDI-8949] Relaxing checkpoint validation for
HoodieStreamerSources (#12752)
1bdba3600a7 is described below
commit 1bdba3600a7facd0edc79fa2c5942a10f73ce8a8
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Jan 31 18:29:53 2025 -0800
[HUDI-8949] Relaxing checkpoint validation for HoodieStreamerSources
(#12752)
---
.../org/apache/hudi/utilities/sources/Source.java | 1 -
.../deltastreamer/TestHoodieDeltaStreamer.java | 45 ++++++++++++++++------
.../streamer/TestHoodieIncrSourceE2E.java | 10 +++--
3 files changed, 40 insertions(+), 16 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index a6c3c0e1411..aec8ad41cae 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -168,7 +168,6 @@ public abstract class Source<T> implements
SourceCommitCallback, Serializable {
public final InputBatch<T> fetchNext(Option<Checkpoint> lastCheckpoint, long
sourceLimit) {
Option<Checkpoint> lastCheckpointTranslated =
translateCheckpoint(lastCheckpoint);
InputBatch<T> batch = readFromCheckpoint(lastCheckpointTranslated,
sourceLimit);
- assertCheckpointVersion(lastCheckpoint, lastCheckpointTranslated,
batch.getCheckpointForNextBatch());
// If overriddenSchemaProvider is passed in CLI, use it
return overriddenSchemaProvider == null ? batch
: new InputBatch<>(batch.getBatch(),
batch.getCheckpointForNextBatch(), overriddenSchemaProvider);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index b687c4d40e7..ad8a69bb8f1 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -47,6 +47,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -650,11 +651,26 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
}
+ private static Stream<Arguments> continuousModeArgs() {
+ return Stream.of(
+ Arguments.of("AVRO", "EIGHT"),
+ Arguments.of("SPARK", "EIGHT"),
+ Arguments.of("AVRO", "SIX")
+ );
+ }
+
+ private static Stream<Arguments> continuousModeMorArgs() {
+ return Stream.of(
+ Arguments.of("AVRO", "EIGHT"),
+ Arguments.of("AVRO", "SIX")
+ );
+ }
+
@Timeout(600)
@ParameterizedTest
- @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
- public void testUpsertsCOWContinuousMode(HoodieRecordType recordType) throws
Exception {
- testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow",
recordType);
+ @MethodSource("continuousModeArgs")
+ public void testUpsertsCOWContinuousMode(HoodieRecordType recordType, String
writeTableVersion) throws Exception {
+ testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow",
recordType, writeTableVersion);
}
@Test
@@ -677,14 +693,14 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@ParameterizedTest
@EnumSource(value = HoodieRecordType.class, names = {"AVRO"})
public void testUpsertsMORContinuousModeShutdownGracefully(HoodieRecordType
recordType) throws Exception {
- testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_cow",
true, recordType);
+ testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_cow",
true, recordType, "EIGHT");
}
@Timeout(600)
@ParameterizedTest
- @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
- public void testUpsertsMORContinuousMode(HoodieRecordType recordType) throws
Exception {
- testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor",
recordType);
+ @MethodSource("continuousModeMorArgs")
+ public void testUpsertsMORContinuousMode(HoodieRecordType recordType, String
writeTableVersion) throws Exception {
+ testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor",
recordType, writeTableVersion);
}
@Test
@@ -703,11 +719,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
- private void testUpsertsContinuousMode(HoodieTableType tableType, String
tempDir, HoodieRecordType recordType) throws Exception {
- testUpsertsContinuousMode(tableType, tempDir, false, recordType);
+ private void testUpsertsContinuousMode(HoodieTableType tableType, String
tempDir, HoodieRecordType recordType, String writeTableVersion) throws
Exception {
+ testUpsertsContinuousMode(tableType, tempDir, false, recordType,
writeTableVersion);
}
- private void testUpsertsContinuousMode(HoodieTableType tableType, String
tempDir, boolean testShutdownGracefully, HoodieRecordType recordType) throws
Exception {
+ private void testUpsertsContinuousMode(HoodieTableType tableType, String
tempDir, boolean testShutdownGracefully, HoodieRecordType recordType,
+ String writeTableVersion) throws
Exception {
String tableBasePath = basePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
@@ -721,6 +738,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.tableType = tableType.name();
cfg.configs.add(String.format("%s=%d",
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
cfg.configs.add(String.format("%s=false",
HoodieCleanConfig.AUTO_CLEAN.key()));
+ if (HoodieTableVersion.SIX.name().equals(writeTableVersion)) {
+ cfg.configs.add(String.format(("%s=%s"),
HoodieWriteConfig.WRITE_TABLE_VERSION.key(),
HoodieTableVersion.SIX.versionCode()));
+ }
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
@@ -736,6 +756,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
return true;
});
+ // validate table version matches
+ HoodieTableMetaClient hudiTblMetaClient =
HoodieTableMetaClient.builder().setBasePath(cfg.targetBasePath).setConf(context.getStorageConf()).build();
+ assertEquals(HoodieTableVersion.valueOf(writeTableVersion),
hudiTblMetaClient.getTableConfig().getTableVersion());
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
@@ -1097,7 +1120,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
- @Test
+ @Disabled("HUDI-8951")
public void testHoodieIndexerExecutionAfterCommit() throws Exception {
String tableBasePath = basePath + "/asyncindexer_commit";
Set<String> customConfigs = new HashSet<>();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
index 830d773da40..409b2d9fe9c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
@@ -34,6 +34,7 @@ import
org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource;
import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource;
import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
@@ -45,6 +46,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import static javolution.testing.TestContext.assertTrue;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
@@ -65,7 +67,6 @@ import static
org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KE
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(MockitoExtension.class)
public class TestHoodieIncrSourceE2E extends S3EventsHoodieIncrSourceHarness {
@@ -132,11 +133,12 @@ public class TestHoodieIncrSourceE2E extends
S3EventsHoodieIncrSourceHarness {
* - Verifies commit metadata still uses checkpoint V1 format
* - Verifies final checkpoint="30"
*/
- @ParameterizedTest
- @CsvSource({
+ /*@ParameterizedTest
+ /@CsvSource({
"6, org.apache.hudi.utilities.sources.MockGeneralHoodieIncrSource",
"8, org.apache.hudi.utilities.sources.MockGeneralHoodieIncrSource"
- })
+ })*/
+ @Disabled("HUDI-8952")
public void testSyncE2EWrongCheckpointVersionErrorOut(String tableVersion,
String sourceClass) throws Exception {
// First start with no previous checkpoint and ingest till ckp 1 with
table version.
// Disable auto upgrade and MDT as we want to keep things as it is.