This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6a84e7cca1082175060190adc9e8ab9610dac019 Author: Vinish Reddy <[email protected]> AuthorDate: Thu Apr 25 09:02:29 2024 +0530 [HUDI-7235] Fix checkpoint bug for S3/GCS Incremental Source (#10336) Co-authored-by: Balaji Varadarajan <[email protected]> Co-authored-by: Balaji Varadarajan <[email protected]> --- .../utilities/sources/GcsEventsHoodieIncrSource.java | 4 ++-- .../utilities/sources/S3EventsHoodieIncrSource.java | 4 ++-- .../utilities/sources/helpers/IncrSourceHelper.java | 18 +++++++++++++++--- .../sources/TestGcsEventsHoodieIncrSource.java | 2 +- .../sources/TestS3EventsHoodieIncrSource.java | 6 +++--- .../sources/helpers/TestIncrSourceHelper.java | 14 ++++++++++++-- 6 files changed, 35 insertions(+), 13 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java index 07950742909..d1d320f99b8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -177,8 +177,8 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint); if (!checkPointAndDataset.getRight().isPresent()) { - LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant()); - return Pair.of(Option.empty(), queryInfo.getEndInstant()); + LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft()); + return Pair.of(Option.empty(), checkPointAndDataset.getLeft().toString()); } LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 84b267709ad..51bc2907cc9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -152,8 +152,8 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint); if (!checkPointAndDataset.getRight().isPresent()) { - LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant()); - return Pair.of(Option.empty(), queryInfo.getEndInstant()); + LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft()); + return Pair.of(Option.empty(), checkPointAndDataset.getLeft().toString()); } LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index 8b40edcf044..e7195acc1a1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -183,7 +183,12 @@ public class IncrSourceHelper { long sourceLimit, QueryInfo queryInfo, CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint) { if (sourceData.isEmpty()) { - return Pair.of(cloudObjectIncrCheckpoint, Option.empty()); + // There is no file matching the prefix. + CloudObjectIncrCheckpoint updatedCheckpoint = + queryInfo.getEndInstant().equals(cloudObjectIncrCheckpoint.getCommit()) + ? cloudObjectIncrCheckpoint + : new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), null); + return Pair.of(updatedCheckpoint, Option.empty()); } // Let's persist the dataset to avoid triggering the dag repeatedly sourceData.persist(StorageLevel.MEMORY_AND_DISK()); @@ -199,11 +204,18 @@ public class IncrSourceHelper { functions.concat(functions.col(queryInfo.getOrderColumn()), functions.col(queryInfo.getKeyColumn()))); // Apply incremental filter orderedDf = orderedDf.filter(functions.col("commit_key").gt(concatenatedKey.get())).drop("commit_key"); - // We could be just at the end of the commit, so return empty + // If there are no more files where commit_key is greater than lastCheckpointCommit#lastCheckpointKey if (orderedDf.isEmpty()) { LOG.info("Empty ordered source, returning endpoint:" + queryInfo.getEndInstant()); sourceData.unpersist(); - return Pair.of(new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()), Option.empty()); + // queryInfo.getEndInstant() represents source table's last completed instant + // If current checkpoint is c1#abc and queryInfo.getEndInstant() is c1, return c1#abc. + // If current checkpoint is c1#abc and queryInfo.getEndInstant() is c2, return c2. + CloudObjectIncrCheckpoint updatedCheckpoint = + queryInfo.getEndInstant().equals(cloudObjectIncrCheckpoint.getCommit()) + ? cloudObjectIncrCheckpoint + : new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), null); + return Pair.of(updatedCheckpoint, Option.empty()); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 3b018473dc4..f8701e7e666 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -242,7 +242,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn @CsvSource({ "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1", "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2", - "3,3#path/to/file5.json,3,1#path/to/file1.json,3" + "3,3#path/to/file5.json,3#path/to/file5.json,1#path/to/file1.json,3" }) public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index a9dd11c5544..c4f77107ec5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -358,8 +358,8 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 1000L, "2", typedProperties); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 1000L, "2", typedProperties); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip4.json"), 1000L, "2", typedProperties); - readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip5.json"), 1000L, "2", typedProperties); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip4.json"), 1000L, "2#path/to/skip4.json", typedProperties); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip5.json"), 1000L, "2#path/to/skip5.json", typedProperties); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2"), 1000L, "2", typedProperties); } @@ -434,7 +434,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne @CsvSource({ "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1", "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2", - "3,3#path/to/file5.json,3,1#path/to/file1.json,3" + "3,3#path/to/file5.json,3#path/to/file5.json,1#path/to/file1.json,3" }) public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java index e2da57fe216..90fa9ca6b0e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java @@ -60,7 +60,6 @@ import java.util.Map; import java.util.stream.Collectors; import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.INIT_INSTANT_TS; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -122,7 +121,7 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { "s3.object.key", "s3.object.size"); Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( emptyDataset, 50L, queryInfo, new CloudObjectIncrCheckpoint(null, null)); - assertEquals(INIT_INSTANT_TS, result.getKey().toString()); + assertEquals("commit2", result.getKey().toString()); assertTrue(!result.getRight().isPresent()); } @@ -261,8 +260,10 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 100L, "commit3")); filePathSizeAndCommitTime.add(Triple.of("path/to/file6.json", 250L, "commit3")); filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 50L, "commit3")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 50L, "commit3")); Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); + // Test case 1 when queryInfo.endInstant() is equal to lastCheckpointCommit QueryInfo queryInfo = new QueryInfo( QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit3", "_hoodie_commit_time", @@ -271,6 +272,15 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json")); assertEquals("commit3#path/to/file8.json", result.getKey().toString()); assertTrue(!result.getRight().isPresent()); + // Test case 2 when queryInfo.endInstant() is greater than lastCheckpointCommit + queryInfo = new QueryInfo( + QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", + "commit4", "_hoodie_commit_time", + "s3.object.key", "s3.object.size"); + result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit3","path/to/file8.json")); + assertEquals("commit4", result.getKey().toString()); + assertTrue(!result.getRight().isPresent()); } private HoodieRecord generateS3EventMetadata(String commitTime, String bucketName, String objectKey, Long objectSize) {
