lokesh-lingarajan-0310 commented on code in PR #9336:
URL: https://github.com/apache/hudi/pull/9336#discussion_r1285354589


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -106,45 +127,108 @@ public static Pair<String, Pair<String, String>> 
calculateBeginAndEndInstants(Ja
       }
     });
 
+    String previousInstantTime = beginInstantTime;
+    if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
+      Option<HoodieInstant> previousInstant = 
activeCommitTimeline.findInstantBefore(beginInstantTime);
+      if (previousInstant.isPresent()) {
+        previousInstantTime = previousInstant.get().getTimestamp();
+      }
+    }
+
     if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST || 
!activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) {
-      Option<HoodieInstant> nthInstant = 
Option.fromJavaOptional(activeCommitTimeline
-          .findInstantsAfter(beginInstantTime, 
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
-      return Pair.of(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), 
Pair.of(beginInstantTime, 
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)));
+      Option<HoodieInstant> nthInstant;
+      // When we are in the upgrade code path from non-sourcelimit-based 
batching to sourcelimit-based batching, we need to avoid fetching the commit
+      // that is read already. Else we will have duplicates in append-only use 
case if we use "findInstantsAfterOrEquals".
+      // As soon as we have a new format of checkpoint and a key we will move 
to the new code of fetching the current commit as well.
+      if (sourceLimitBasedBatching && lastCheckpointKey.isPresent()) {
+        nthInstant = Option.fromJavaOptional(activeCommitTimeline
+            .findInstantsAfterOrEquals(beginInstantTime, 
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+      } else {
+        nthInstant = Option.fromJavaOptional(activeCommitTimeline
+            .findInstantsAfter(beginInstantTime, 
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+      }
+      return new 
QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), 
previousInstantTime,
+          beginInstantTime, 
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime),
+          orderColumn, keyColumn, limitColumn);
     } else {
       // when MissingCheckpointStrategy is set to read everything until 
latest, trigger snapshot query.
       Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
-      return Pair.of(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(), 
Pair.of(beginInstantTime, timestampForLastInstant.apply(lastInstant.get())));
+      return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(),
+          previousInstantTime, beginInstantTime, 
lastInstant.get().getTimestamp(),
+          orderColumn, keyColumn, limitColumn);
     }
   }
 
   /**
-   * Validate instant time seen in the incoming row.
+   * Adjust the source dataset to size based batch based on last checkpoint 
key.
    *
-   * @param row          Input Row
-   * @param instantTime  Hoodie Instant time of the row
-   * @param sinceInstant begin instant of the batch
-   * @param endInstant   end instant of the batch
+   * @param sourceData  Source dataset
+   * @param sourceLimit Max number of bytes to be read from source
+   * @param queryInfo   Query Info
+   * @return end instants along with filtered rows.
    */
-  public static void validateInstantTime(Row row, String instantTime, String 
sinceInstant, String endInstant) {
-    Objects.requireNonNull(instantTime);
-    
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, 
HoodieTimeline.GREATER_THAN, sinceInstant),
-        "Instant time(_hoodie_commit_time) in row (" + row + ") was : " + 
instantTime + "but expected to be between "
-            + sinceInstant + "(excl) - " + endInstant + "(incl)");
-    ValidationUtils.checkArgument(
-        HoodieTimeline.compareTimestamps(instantTime, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant),
-        "Instant time(_hoodie_commit_time) in row (" + row + ") was : " + 
instantTime + "but expected to be between "
-            + sinceInstant + "(excl) - " + endInstant + "(incl)");
+  public static Pair<CloudObjectIncrCheckpoint, Dataset<Row>> 
filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData,
+                                                                               
                             long sourceLimit, QueryInfo queryInfo,
+                                                                               
                             CloudObjectIncrCheckpoint 
cloudObjectIncrCheckpoint) {
+    if (sourceData.isEmpty()) {
+      LOG.info("Empty source, returning endpoint:" + 
queryInfo.getEndInstant());
+      return Pair.of(cloudObjectIncrCheckpoint, sourceData);
+    }
+    // Let's persist the dataset to avoid triggering the dag repeatedly
+    sourceData.persist(StorageLevel.MEMORY_AND_DISK());
+    // Set ordering in query to enable batching
+    Dataset<Row> orderedDf = QueryRunner.applyOrdering(sourceData, 
queryInfo.getOrderByColumns());
+    Option<String> lastCheckpoint = 
Option.of(cloudObjectIncrCheckpoint.getCommit());
+    Option<String> lastCheckpointKey = 
Option.ofNullable(cloudObjectIncrCheckpoint.getKey());
+    Option<String> concatenatedKey = lastCheckpoint.flatMap(checkpoint -> 
lastCheckpointKey.map(key -> checkpoint + key));
+
+    // Filter until last checkpoint key
+    if (concatenatedKey.isPresent()) {
+      orderedDf = orderedDf.withColumn("commit_key",
+          functions.concat(functions.col(queryInfo.getOrderColumn()), 
functions.col(queryInfo.getKeyColumn())));
+      // Apply incremental filter
+      orderedDf = 
orderedDf.filter(functions.col("commit_key").gt(concatenatedKey.get())).drop("commit_key");

Review Comment:
   this concatenation is a side effect of batch handling and we do not need 
this info anywhere in further processing and that is the reason for dropping 
it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to