This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 49c5fcbf49 [HUDI-5176] Fix incremental source to consider inflight 
commits before completed commits (#7160)
49c5fcbf49 is described below

commit 49c5fcbf49f7516b27a4ae2fbd4f9d1354125f7a
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Nov 10 11:45:01 2022 -0800

    [HUDI-5176] Fix incremental source to consider inflight commits before 
completed commits (#7160)
    
    When multiple writers are writing to the same Hudi table under optimistic 
concurrency control, one writer may start later and finish the write 
transaction earlier than the other concurrent writer. In this case, the Hudi 
timeline contains an inflight commit before a completed commit. As a concrete 
example, writer 1 starts a commit at t1 and later writer 2 starts another 
commit at t2 (t2 > t1). Commit t2 finishes earlier than t1.
    
                                           t3        t4
    ---------------------------------------------------------> t
     instant t1 |------------------------------| (writer 1)
     instant t2         |--------------|         (writer 2)
    When Hudi incremental source is used on such a Hudi table, the logic to 
determine the next fetch (IncrSourceHelper.calculateBeginAndEndInstants()) is 
susceptible to data loss, missing the inflight instants. The method filters for 
the completed instants only and uses that to determine the instants to fetch, 
thus the checkpoint. For the same example, when incrementally fetching the data 
at t3, the checkpoint advances to t2 and ignores t1 indefinitely.
    
    To fix the problem, this PR adds the logic to only look at contiguous 
completed commits / deltacommits / non-clustering replacecommits for 
incremental pulls, e.g., completed instants before t1 in the example, so that 
the checkpoint never goes over the inflight commits which can cause data loss. 
To implement this, we figure out the earliest incomplete commit, deltacommit, 
or non-clustering replacecommit to bound the incremental pulls.
    
    This PR fixes the HoodieIncrSource, S3EventsHoodieIncrSource, and 
GcsEventsHoodieIncrSource for the same issue.
---
 .../hudi/common/table/timeline/TimelineUtils.java  |   4 +-
 .../hudi/utilities/sources/HoodieIncrSource.java   |   4 +-
 .../sources/S3EventsHoodieIncrSource.java          |   4 +-
 .../sources/helpers/IncrSourceHelper.java          |  19 +-
 .../utilities/sources/helpers/gcs/QueryInfo.java   |   5 +-
 .../utilities/sources/TestHoodieIncrSource.java    | 233 +++++++++++++++++++--
 6 files changed, 244 insertions(+), 25 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index a2e88b8009..965870d0e9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -185,7 +185,7 @@ public class TimelineUtils {
     }
   }
   
-  private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, 
HoodieInstant instant) {
+  public static boolean isClusteringCommit(HoodieTableMetaClient metaClient, 
HoodieInstant instant) {
     try {
       if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
         // replacecommit is used for multiple operations: 
insert_overwrite/cluster etc. 
@@ -194,7 +194,7 @@ public class TimelineUtils {
             metaClient.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
         return 
WriteOperationType.CLUSTER.equals(replaceMetadata.getOperationType());
       }
-      
+
       return false;
     } catch (IOException e) {
       throw new HoodieIOException("Unable to read instant information: " + 
instant + " for " + metaClient.getBasePath(), e);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 3c162a0522..9db85c5ddb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -148,7 +148,9 @@ public class HoodieIncrSource extends RowSource {
           .load(srcPath)
           // add filtering so that only interested records are returned.
           .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-              queryTypeAndInstantEndpts.getRight().getLeft()));
+              queryTypeAndInstantEndpts.getRight().getLeft()))
+          .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+              queryTypeAndInstantEndpts.getRight().getRight()));
     }
 
     /*
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 1813743365..7b8232f619 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -157,7 +157,9 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
           .option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath)
           // add filtering so that only interested records are returned.
           .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-              queryTypeAndInstantEndpts.getRight().getLeft()));
+              queryTypeAndInstantEndpts.getRight().getLeft()))
+          .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+              queryTypeAndInstantEndpts.getRight().getRight()));
     }
 
     if (source.isEmpty()) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index b6e17799e6..0a6ae7e85f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -23,11 +23,12 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
-
 import org.apache.hudi.utilities.sources.HoodieIncrSource;
+
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Row;
 
@@ -78,8 +79,22 @@ public class IncrSourceHelper {
         "Make sure the config 
hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive 
value");
     HoodieTableMetaClient srcMetaClient = 
HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
 
-    final HoodieTimeline activeCommitTimeline =
+    // Find the earliest incomplete commit, deltacommit, or non-clustering 
replacecommit,
+    // so that the incremental pulls should be strictly before this instant.
+    // This is to guard around multi-writer scenarios where a commit starting 
later than
+    // another commit from a concurrent writer can finish earlier, leaving an 
inflight commit
+    // before a completed commit.
+    final Option<HoodieInstant> firstIncompleteCommit = 
srcMetaClient.getCommitsTimeline()
+        .filterInflightsAndRequested()
+        .filter(instant ->
+            !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())
+                || !ClusteringUtils.getClusteringPlan(srcMetaClient, 
instant).isPresent())
+        .firstInstant();
+    final HoodieTimeline completedCommitTimeline =
         
srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
+    final HoodieTimeline activeCommitTimeline = firstIncompleteCommit.map(
+        commit -> 
completedCommitTimeline.findInstantsBefore(commit.getTimestamp())
+    ).orElse(completedCommitTimeline);
 
     String beginInstantTime = beginInstant.orElseGet(() -> {
       if (missingCheckpointStrategy != null) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java
index 52003f6717..b73a9d5327 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java
@@ -19,12 +19,14 @@
 package org.apache.hudi.utilities.sources.helpers.gcs;
 
 import org.apache.hudi.common.model.HoodieRecord;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+
 import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
 import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
 import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
@@ -56,7 +58,8 @@ public class QueryInfo {
 
     // Issue a snapshot query.
     return snapshotQuery(sparkSession).load(srcPath)
-            .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, getStartInstant()));
+        .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, getStartInstant()))
+        .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, getEndInstant()));
   }
 
   public boolean areStartAndEndInstantsEqual() {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index b08438c5a7..5762bae3cb 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -25,9 +25,13 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieArchivalConfig;
@@ -46,20 +50,23 @@ import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-import java.util.stream.Stream;
 
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT;
+import static org.apache.hudi.common.model.WriteOperationType.INSERT;
+import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
 
@@ -83,12 +90,8 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
     return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, 
basePath, props);
   }
 
-  private static Stream<Arguments> tableTypeParams() {
-    return Arrays.stream(new HoodieTableType[][] 
{{HoodieTableType.COPY_ON_WRITE}, 
{HoodieTableType.MERGE_ON_READ}}).map(Arguments::of);
-  }
-
   @ParameterizedTest
-  @MethodSource("tableTypeParams")
+  @EnumSource(HoodieTableType.class)
   public void testHoodieIncrSource(HoodieTableType tableType) throws 
IOException {
     this.tableType = tableType;
     metaClient = getHoodieMetaClient(hadoopConf(), basePath());
@@ -101,11 +104,11 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
         .build();
 
     SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
-    Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, true, 
null, "100");
-    Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, 
true, null, "200");
-    Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, 
true, null, "300");
-    Pair<String, List<HoodieRecord>> inserts4 = writeRecords(writeClient, 
true, null, "400");
-    Pair<String, List<HoodieRecord>> inserts5 = writeRecords(writeClient, 
true, null, "500");
+    Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, 
INSERT, null, "100");
+    Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, 
INSERT, null, "200");
+    Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, 
INSERT, null, "300");
+    Pair<String, List<HoodieRecord>> inserts4 = writeRecords(writeClient, 
INSERT, null, "400");
+    Pair<String, List<HoodieRecord>> inserts5 = writeRecords(writeClient, 
INSERT, null, "500");
 
     // read everything upto latest
     
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
 Option.empty(), 500, inserts5.getKey());
@@ -122,13 +125,200 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
     // ensure checkpoint does not move
     readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 
Option.of(inserts5.getKey()), 0, inserts5.getKey());
 
-    Pair<String, List<HoodieRecord>> inserts6 = writeRecords(writeClient, 
true, null, "600");
+    Pair<String, List<HoodieRecord>> inserts6 = writeRecords(writeClient, 
INSERT, null, "600");
 
     // insert new batch and ensure the checkpoint moves
     readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 
Option.of(inserts5.getKey()), 100, inserts6.getKey());
     writeClient.close();
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void 
testHoodieIncrSourceInflightCommitBeforeCompletedCommit(HoodieTableType 
tableType) throws IOException {
+    this.tableType = tableType;
+    metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+    HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3, 
4).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder()
+                .withInlineCompaction(true)
+                .withMaxNumDeltaCommitsBeforeCompaction(3)
+                .build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .build();
+
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
+    List<Pair<String, List<HoodieRecord>>> inserts = new ArrayList<>();
+
+    for (int i = 0; i < 6; i++) {
+      inserts.add(writeRecords(writeClient, INSERT, null, 
HoodieActiveTimeline.createNewInstantTime()));
+    }
+
+    // Emulates a scenario where an inflight commit is before a completed 
commit
+    // The checkpoint should not go past this commit
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieInstant instant4 = activeTimeline
+        .filter(instant -> 
instant.getTimestamp().equals(inserts.get(4).getKey())).firstInstant().get();
+    Option<byte[]> instant4CommitData = 
activeTimeline.getInstantDetails(instant4);
+    activeTimeline.revertToInflight(instant4);
+    metaClient.reloadActiveTimeline();
+
+    // Reads everything up to latest
+    readAndAssert(
+        IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+        Option.empty(),
+        400,
+        inserts.get(3).getKey());
+
+    // Even if the beginning timestamp is archived, full table scan should 
kick in, but should filter for records having commit time > first instant time
+    readAndAssert(
+        IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+        Option.of(inserts.get(0).getKey()),
+        300,
+        inserts.get(3).getKey());
+
+    // Even if the read upto latest is set, if begin timestamp is in active 
timeline, only incremental should kick in.
+    readAndAssert(
+        IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+        Option.of(inserts.get(2).getKey()),
+        100,
+        inserts.get(3).getKey());
+
+    // Reads just the latest
+    readAndAssert(
+        IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
+        Option.empty(),
+        100,
+        inserts.get(3).getKey());
+
+    // Ensures checkpoint does not move
+    readAndAssert(
+        IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
+        Option.of(inserts.get(3).getKey()),
+        0,
+        inserts.get(3).getKey());
+
+    activeTimeline.reload().saveAsComplete(
+        new HoodieInstant(HoodieInstant.State.INFLIGHT, instant4.getAction(), 
inserts.get(4).getKey()),
+        instant4CommitData);
+
+    // After the inflight commit completes, the checkpoint should move on 
after incremental pull
+    readAndAssert(
+        IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
+        Option.of(inserts.get(3).getKey()),
+        200,
+        inserts.get(5).getKey());
+
+    writeClient.close();
+  }
+
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testHoodieIncrSourceWithPendingTableServices(HoodieTableType 
tableType) throws IOException {
+    this.tableType = tableType;
+    metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+    HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 
12).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder()
+                .withScheduleInlineCompaction(true)
+                .withMaxNumDeltaCommitsBeforeCompaction(1)
+                .build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .build();
+
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
+    List<Pair<String, List<HoodieRecord>>> dataBatches = new ArrayList<>();
+
+    // For COW:
+    //   0: bulk_insert of 100 records
+    //   1: bulk_insert of 100 records
+    //   2: bulk_insert of 100 records
+    //      schedule clustering
+    //   3: bulk_insert of 100 records
+    //   4: upsert of 100 records (updates only based on round 3)
+    //   5: upsert of 100 records (updates only based on round 3)
+    //   6: bulk_insert of 100 records
+    // For MOR:
+    //   0: bulk_insert of 100 records
+    //   1: bulk_insert of 100 records
+    //   2: bulk_insert of 100 records
+    //   3: bulk_insert of 100 records
+    //   4: upsert of 100 records (updates only based on round 3)
+    //      schedule compaction
+    //   5: upsert of 100 records (updates only based on round 3)
+    //      schedule clustering
+    //   6: bulk_insert of 100 records
+    for (int i = 0; i < 6; i++) {
+      WriteOperationType opType = i < 4 ? BULK_INSERT : UPSERT;
+      List<HoodieRecord> recordsForUpdate = i < 4 ? null : 
dataBatches.get(3).getRight();
+      dataBatches.add(writeRecords(writeClient, opType, recordsForUpdate, 
HoodieActiveTimeline.createNewInstantTime()));
+      if (tableType == COPY_ON_WRITE) {
+        if (i == 2) {
+          writeClient.scheduleClustering(Option.empty());
+        }
+      } else if (tableType == MERGE_ON_READ) {
+        if (i == 4) {
+          writeClient.scheduleCompaction(Option.empty());
+        }
+        if (i == 5) {
+          writeClient.scheduleClustering(Option.empty());
+        }
+      }
+    }
+    dataBatches.add(writeRecords(writeClient, BULK_INSERT, null, 
HoodieActiveTimeline.createNewInstantTime()));
+
+    String latestCommitTimestamp = dataBatches.get(dataBatches.size() - 
1).getKey();
+    // Pending clustering exists
+    Option<HoodieInstant> clusteringInstant =
+        metaClient.getActiveTimeline().filterPendingReplaceTimeline()
+            .filter(instant -> ClusteringUtils.getClusteringPlan(metaClient, 
instant).isPresent())
+            .firstInstant();
+    assertTrue(clusteringInstant.isPresent());
+    
assertTrue(clusteringInstant.get().getTimestamp().compareTo(latestCommitTimestamp)
 < 0);
+
+    if (tableType == MERGE_ON_READ) {
+      // Pending compaction exists
+      Option<HoodieInstant> compactionInstant =
+          
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+      assertTrue(compactionInstant.isPresent());
+      
assertTrue(compactionInstant.get().getTimestamp().compareTo(latestCommitTimestamp)
 < 0);
+    }
+
+    // The pending tables services should not block the incremental pulls
+    // Reads everything up to latest
+    readAndAssert(
+        IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+        Option.empty(),
+        500,
+        dataBatches.get(6).getKey());
+
+    // Even if the read upto latest is set, if begin timestamp is in active 
timeline, only incremental should kick in.
+    readAndAssert(
+        IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+        Option.of(dataBatches.get(2).getKey()),
+        200,
+        dataBatches.get(6).getKey());
+
+    // Reads just the latest
+    readAndAssert(
+        IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
+        Option.empty(),
+        100,
+        dataBatches.get(6).getKey());
+
+    // Ensures checkpoint does not move
+    readAndAssert(
+        IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
+        Option.of(dataBatches.get(6).getKey()),
+        0,
+        dataBatches.get(6).getKey());
+
+    writeClient.close();
+  }
+
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, 
String expectedCheckpoint) {
 
     Properties properties = new Properties();
@@ -148,10 +338,17 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
     Assertions.assertEquals(expectedCheckpoint, batchCheckPoint.getRight());
   }
 
-  private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient 
writeClient, boolean insert, List<HoodieRecord> insertRecords, String commit) 
throws IOException {
+  private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient 
writeClient,
+                                                        WriteOperationType 
writeOperationType,
+                                                        List<HoodieRecord> 
insertRecords,
+                                                        String commit) throws 
IOException {
     writeClient.startCommitWithTime(commit);
-    List<HoodieRecord> records = insert ? dataGen.generateInserts(commit, 100) 
: dataGen.generateUpdates(commit, insertRecords);
-    JavaRDD<WriteStatus> result = 
writeClient.upsert(jsc().parallelize(records, 1), commit);
+    // Only supports INSERT, UPSERT, and BULK_INSERT
+    List<HoodieRecord> records = writeOperationType == 
WriteOperationType.UPSERT
+        ? dataGen.generateUpdates(commit, insertRecords) : 
dataGen.generateInserts(commit, 100);
+    JavaRDD<WriteStatus> result = writeOperationType == 
WriteOperationType.BULK_INSERT
+        ? writeClient.bulkInsert(jsc().parallelize(records, 1), commit)
+        : writeClient.upsert(jsc().parallelize(records, 1), commit);
     List<WriteStatus> statuses = result.collect();
     assertNoWriteErrors(statuses);
     return Pair.of(commit, records);

Reply via email to