This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bdba3600a7 [HUDI-8949] Relaxing checkpoint validation for 
HoodieStreamerSources (#12752)
1bdba3600a7 is described below

commit 1bdba3600a7facd0edc79fa2c5942a10f73ce8a8
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Jan 31 18:29:53 2025 -0800

    [HUDI-8949] Relaxing checkpoint validation for HoodieStreamerSources 
(#12752)
---
 .../org/apache/hudi/utilities/sources/Source.java  |  1 -
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 45 ++++++++++++++++------
 .../streamer/TestHoodieIncrSourceE2E.java          | 10 +++--
 3 files changed, 40 insertions(+), 16 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index a6c3c0e1411..aec8ad41cae 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -168,7 +168,6 @@ public abstract class Source<T> implements 
SourceCommitCallback, Serializable {
   public final InputBatch<T> fetchNext(Option<Checkpoint> lastCheckpoint, long 
sourceLimit) {
     Option<Checkpoint> lastCheckpointTranslated = 
translateCheckpoint(lastCheckpoint);
     InputBatch<T> batch = readFromCheckpoint(lastCheckpointTranslated, 
sourceLimit);
-    assertCheckpointVersion(lastCheckpoint, lastCheckpointTranslated, 
batch.getCheckpointForNextBatch());
     // If overriddenSchemaProvider is passed in CLI, use it
     return overriddenSchemaProvider == null ? batch
         : new InputBatch<>(batch.getBatch(), 
batch.getCheckpointForNextBatch(), overriddenSchemaProvider);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index b687c4d40e7..ad8a69bb8f1 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -47,6 +47,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -650,11 +651,26 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
   }
 
+  private static Stream<Arguments> continuousModeArgs() {
+    return Stream.of(
+        Arguments.of("AVRO", "EIGHT"),
+        Arguments.of("SPARK", "EIGHT"),
+        Arguments.of("AVRO", "SIX")
+    );
+  }
+
+  private static Stream<Arguments> continuousModeMorArgs() {
+    return Stream.of(
+        Arguments.of("AVRO", "EIGHT"),
+        Arguments.of("AVRO", "SIX")
+    );
+  }
+
   @Timeout(600)
   @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
-  public void testUpsertsCOWContinuousMode(HoodieRecordType recordType) throws 
Exception {
-    testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", 
recordType);
+  @MethodSource("continuousModeArgs")
+  public void testUpsertsCOWContinuousMode(HoodieRecordType recordType, String 
writeTableVersion) throws Exception {
+    testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", 
recordType, writeTableVersion);
   }
 
   @Test
@@ -677,14 +693,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @ParameterizedTest
   @EnumSource(value = HoodieRecordType.class, names = {"AVRO"})
   public void testUpsertsMORContinuousModeShutdownGracefully(HoodieRecordType 
recordType) throws Exception {
-    testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_cow", 
true, recordType);
+    testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_cow", 
true, recordType, "EIGHT");
   }
 
   @Timeout(600)
   @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
-  public void testUpsertsMORContinuousMode(HoodieRecordType recordType) throws 
Exception {
-    testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor", 
recordType);
+  @MethodSource("continuousModeMorArgs")
+  public void testUpsertsMORContinuousMode(HoodieRecordType recordType, String 
writeTableVersion) throws Exception {
+    testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor", 
recordType, writeTableVersion);
   }
 
   @Test
@@ -703,11 +719,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
-  private void testUpsertsContinuousMode(HoodieTableType tableType, String 
tempDir, HoodieRecordType recordType) throws Exception {
-    testUpsertsContinuousMode(tableType, tempDir, false, recordType);
+  private void testUpsertsContinuousMode(HoodieTableType tableType, String 
tempDir, HoodieRecordType recordType, String writeTableVersion) throws 
Exception {
+    testUpsertsContinuousMode(tableType, tempDir, false, recordType, 
writeTableVersion);
   }
 
-  private void testUpsertsContinuousMode(HoodieTableType tableType, String 
tempDir, boolean testShutdownGracefully, HoodieRecordType recordType) throws 
Exception {
+  private void testUpsertsContinuousMode(HoodieTableType tableType, String 
tempDir, boolean testShutdownGracefully, HoodieRecordType recordType,
+                                         String writeTableVersion) throws 
Exception {
     String tableBasePath = basePath + "/" + tempDir;
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 3000;
@@ -721,6 +738,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.tableType = tableType.name();
     cfg.configs.add(String.format("%s=%d", 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
     cfg.configs.add(String.format("%s=false", 
HoodieCleanConfig.AUTO_CLEAN.key()));
+    if (HoodieTableVersion.SIX.name().equals(writeTableVersion)) {
+      cfg.configs.add(String.format(("%s=%s"), 
HoodieWriteConfig.WRITE_TABLE_VERSION.key(), 
HoodieTableVersion.SIX.versionCode()));
+    }
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
@@ -736,6 +756,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       }
       return true;
     });
+    // validate table version matches
+    HoodieTableMetaClient hudiTblMetaClient = 
HoodieTableMetaClient.builder().setBasePath(cfg.targetBasePath).setConf(context.getStorageConf()).build();
+    assertEquals(HoodieTableVersion.valueOf(writeTableVersion), 
hudiTblMetaClient.getTableConfig().getTableVersion());
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
@@ -1097,7 +1120,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
-  @Test
+  @Disabled("HUDI-8951")
   public void testHoodieIndexerExecutionAfterCommit() throws Exception {
     String tableBasePath = basePath + "/asyncindexer_commit";
     Set<String> customConfigs = new HashSet<>();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
index 830d773da40..409b2d9fe9c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
@@ -34,6 +34,7 @@ import 
org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource;
 import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource;
 import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness;
 
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -45,6 +46,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static javolution.testing.TestContext.assertTrue;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
 import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
@@ -65,7 +67,6 @@ import static 
org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KE
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(MockitoExtension.class)
 public class TestHoodieIncrSourceE2E extends S3EventsHoodieIncrSourceHarness {
@@ -132,11 +133,12 @@ public class TestHoodieIncrSourceE2E extends 
S3EventsHoodieIncrSourceHarness {
    *    - Verifies commit metadata still uses checkpoint V1 format
    *    - Verifies final checkpoint="30"
    */
-  @ParameterizedTest
-  @CsvSource({
+  /*@ParameterizedTest
+  /@CsvSource({
       "6, org.apache.hudi.utilities.sources.MockGeneralHoodieIncrSource",
       "8, org.apache.hudi.utilities.sources.MockGeneralHoodieIncrSource"
-  })
+  })*/
+  @Disabled("HUDI-8952")
   public void testSyncE2EWrongCheckpointVersionErrorOut(String tableVersion, 
String sourceClass) throws Exception {
     // First start with no previous checkpoint and ingest till ckp 1 with 
table version.
     // Disable auto upgrade and MDT as we want to keep things as it is.

Reply via email to