This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6a84e7cca1082175060190adc9e8ab9610dac019
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Apr 25 09:02:29 2024 +0530

    [HUDI-7235] Fix checkpoint bug for S3/GCS Incremental Source (#10336)
    
    Co-authored-by: Balaji Varadarajan <[email protected]>
    Co-authored-by: Balaji Varadarajan <[email protected]>
---
 .../utilities/sources/GcsEventsHoodieIncrSource.java   |  4 ++--
 .../utilities/sources/S3EventsHoodieIncrSource.java    |  4 ++--
 .../utilities/sources/helpers/IncrSourceHelper.java    | 18 +++++++++++++++---
 .../sources/TestGcsEventsHoodieIncrSource.java         |  2 +-
 .../sources/TestS3EventsHoodieIncrSource.java          |  6 +++---
 .../sources/helpers/TestIncrSourceHelper.java          | 14 ++++++++++++--
 6 files changed, 35 insertions(+), 13 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index 07950742909..d1d320f99b8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -177,8 +177,8 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
         IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
             filteredSourceData, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
     if (!checkPointAndDataset.getRight().isPresent()) {
-      LOG.info("Empty source, returning endpoint:" + 
queryInfo.getEndInstant());
-      return Pair.of(Option.empty(), queryInfo.getEndInstant());
+      LOG.info("Empty source, returning endpoint:" + 
checkPointAndDataset.getLeft());
+      return Pair.of(Option.empty(), 
checkPointAndDataset.getLeft().toString());
     }
     LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 84b267709ad..51bc2907cc9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -152,8 +152,8 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
         IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
             filteredSourceData, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
     if (!checkPointAndDataset.getRight().isPresent()) {
-      LOG.info("Empty source, returning endpoint:" + 
queryInfo.getEndInstant());
-      return Pair.of(Option.empty(), queryInfo.getEndInstant());
+      LOG.info("Empty source, returning endpoint:" + 
checkPointAndDataset.getLeft());
+      return Pair.of(Option.empty(), 
checkPointAndDataset.getLeft().toString());
     }
     LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index 8b40edcf044..e7195acc1a1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -183,7 +183,12 @@ public class IncrSourceHelper {
                                                                                
                                     long sourceLimit, QueryInfo queryInfo,
                                                                                
                                     CloudObjectIncrCheckpoint 
cloudObjectIncrCheckpoint) {
     if (sourceData.isEmpty()) {
-      return Pair.of(cloudObjectIncrCheckpoint, Option.empty());
+      // There is no file matching the prefix.
+      CloudObjectIncrCheckpoint updatedCheckpoint =
+              
queryInfo.getEndInstant().equals(cloudObjectIncrCheckpoint.getCommit())
+                      ? cloudObjectIncrCheckpoint
+                      : new 
CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), null);
+      return Pair.of(updatedCheckpoint, Option.empty());
     }
     // Let's persist the dataset to avoid triggering the dag repeatedly
     sourceData.persist(StorageLevel.MEMORY_AND_DISK());
@@ -199,11 +204,18 @@ public class IncrSourceHelper {
           functions.concat(functions.col(queryInfo.getOrderColumn()), 
functions.col(queryInfo.getKeyColumn())));
       // Apply incremental filter
       orderedDf = 
orderedDf.filter(functions.col("commit_key").gt(concatenatedKey.get())).drop("commit_key");
-      // We could be just at the end of the commit, so return empty
+      // If there are no more files where commit_key is greater than 
lastCheckpointCommit#lastCheckpointKey
       if (orderedDf.isEmpty()) {
         LOG.info("Empty ordered source, returning endpoint:" + 
queryInfo.getEndInstant());
         sourceData.unpersist();
-        return Pair.of(new 
CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()), 
Option.empty());
+        // queryInfo.getEndInstant() represents source table's last completed 
instant
+        // If current checkpoint is c1#abc and queryInfo.getEndInstant() is 
c1, return c1#abc.
+        // If current checkpoint is c1#abc and queryInfo.getEndInstant() is 
c2, return c2.
+        CloudObjectIncrCheckpoint updatedCheckpoint =
+            
queryInfo.getEndInstant().equals(cloudObjectIncrCheckpoint.getCommit())
+                ? cloudObjectIncrCheckpoint
+                : new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), 
null);
+        return Pair.of(updatedCheckpoint, Option.empty());
       }
     }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 3b018473dc4..f8701e7e666 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -242,7 +242,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   @CsvSource({
       "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1",
       "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2",
-      "3,3#path/to/file5.json,3,1#path/to/file1.json,3"
+      "3,3#path/to/file5.json,3#path/to/file5.json,1#path/to/file1.json,3"
   })
   public void testSplitSnapshotLoad(String snapshotCheckPoint, String 
exptected1, String exptected2, String exptected3, String exptected4) throws 
IOException {
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index a9dd11c5544..c4f77107ec5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -358,8 +358,8 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 1000L, "2", 
typedProperties);
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 
1000L, "2", typedProperties);
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip4.json"), 
1000L, "2", typedProperties);
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip5.json"), 
1000L, "2", typedProperties);
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip4.json"), 
1000L, "2#path/to/skip4.json", typedProperties);
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip5.json"), 
1000L, "2#path/to/skip5.json", typedProperties);
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2"), 1000L, "2", 
typedProperties);
   }
 
@@ -434,7 +434,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
   @CsvSource({
       "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1",
       "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2",
-      "3,3#path/to/file5.json,3,1#path/to/file1.json,3"
+      "3,3#path/to/file5.json,3#path/to/file5.json,1#path/to/file1.json,3"
   })
   public void testSplitSnapshotLoad(String snapshotCheckPoint, String 
exptected1, String exptected2, String exptected3, String exptected4) throws 
IOException {
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
index e2da57fe216..90fa9ca6b0e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
@@ -60,7 +60,6 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.INIT_INSTANT_TS;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -122,7 +121,7 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
         "s3.object.key", "s3.object.size");
     Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
         emptyDataset, 50L, queryInfo, new CloudObjectIncrCheckpoint(null, 
null));
-    assertEquals(INIT_INSTANT_TS, result.getKey().toString());
+    assertEquals("commit2", result.getKey().toString());
     assertTrue(!result.getRight().isPresent());
   }
 
@@ -261,8 +260,10 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
     filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 100L, 
"commit3"));
     filePathSizeAndCommitTime.add(Triple.of("path/to/file6.json", 250L, 
"commit3"));
     filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 50L, 
"commit3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 50L, 
"commit3"));
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
+    // Test case 1 when queryInfo.endInstant() is equal to lastCheckpointCommit
     QueryInfo queryInfo = new QueryInfo(
         QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
         "commit3", "_hoodie_commit_time",
@@ -271,6 +272,15 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
         inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit3", 
"path/to/file8.json"));
     assertEquals("commit3#path/to/file8.json", result.getKey().toString());
     assertTrue(!result.getRight().isPresent());
+    // Test case 2 when queryInfo.endInstant() is greater than 
lastCheckpointCommit
+    queryInfo = new QueryInfo(
+        QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+        "commit4", "_hoodie_commit_time",
+        "s3.object.key", "s3.object.size");
+    result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        inputDs, 1500L, queryInfo, new 
CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
+    assertEquals("commit4", result.getKey().toString());
+    assertTrue(!result.getRight().isPresent());
   }
 
   private HoodieRecord generateS3EventMetadata(String commitTime, String 
bucketName, String objectKey, Long objectSize) {

Reply via email to