amrishlal commented on code in PR #9336:
URL: https://github.com/apache/hudi/pull/9336#discussion_r1285277377
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -106,45 +127,108 @@ public static Pair<String, Pair<String, String>>
calculateBeginAndEndInstants(Ja
}
});
+ String previousInstantTime = beginInstantTime;
+ if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
+ Option<HoodieInstant> previousInstant =
activeCommitTimeline.findInstantBefore(beginInstantTime);
+ if (previousInstant.isPresent()) {
+ previousInstantTime = previousInstant.get().getTimestamp();
+ }
+ }
+
if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST ||
!activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) {
- Option<HoodieInstant> nthInstant =
Option.fromJavaOptional(activeCommitTimeline
- .findInstantsAfter(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
- return Pair.of(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(),
Pair.of(beginInstantTime,
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)));
+ Option<HoodieInstant> nthInstant;
+ // When we are in the upgrade code path from non-sourcelimit-based
batching to sourcelimit-based batching, we need to avoid fetching the commit
+ // that is read already. Else we will have duplicates in append-only use
case if we use "findInstantsAfterOrEquals".
+ // As soon as we have a new format of checkpoint and a key we will move
to the new code of fetching the current commit as well.
+ if (sourceLimitBasedBatching && lastCheckpointKey.isPresent()) {
+ nthInstant = Option.fromJavaOptional(activeCommitTimeline
+ .findInstantsAfterOrEquals(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+ } else {
+ nthInstant = Option.fromJavaOptional(activeCommitTimeline
+ .findInstantsAfter(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+ }
+ return new
QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(),
previousInstantTime,
+ beginInstantTime,
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime),
+ orderColumn, keyColumn, limitColumn);
} else {
// when MissingCheckpointStrategy is set to read everything until
latest, trigger snapshot query.
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
- return Pair.of(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(),
Pair.of(beginInstantTime, timestampForLastInstant.apply(lastInstant.get())));
+ return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(),
+ previousInstantTime, beginInstantTime,
lastInstant.get().getTimestamp(),
+ orderColumn, keyColumn, limitColumn);
}
}
/**
- * Validate instant time seen in the incoming row.
+ * Adjust the source dataset to size based batch based on last checkpoint
key.
*
- * @param row Input Row
- * @param instantTime Hoodie Instant time of the row
- * @param sinceInstant begin instant of the batch
- * @param endInstant end instant of the batch
+ * @param sourceData Source dataset
+ * @param sourceLimit Max number of bytes to be read from source
+ * @param queryInfo Query Info
+ * @return end instants along with filtered rows.
*/
- public static void validateInstantTime(Row row, String instantTime, String
sinceInstant, String endInstant) {
- Objects.requireNonNull(instantTime);
-
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime,
HoodieTimeline.GREATER_THAN, sinceInstant),
- "Instant time(_hoodie_commit_time) in row (" + row + ") was : " +
instantTime + "but expected to be between "
- + sinceInstant + "(excl) - " + endInstant + "(incl)");
- ValidationUtils.checkArgument(
- HoodieTimeline.compareTimestamps(instantTime,
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant),
- "Instant time(_hoodie_commit_time) in row (" + row + ") was : " +
instantTime + "but expected to be between "
- + sinceInstant + "(excl) - " + endInstant + "(incl)");
+ public static Pair<CloudObjectIncrCheckpoint, Dataset<Row>>
filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData,
+
long sourceLimit, QueryInfo queryInfo,
+
CloudObjectIncrCheckpoint
cloudObjectIncrCheckpoint) {
+ if (sourceData.isEmpty()) {
+ LOG.info("Empty source, returning endpoint:" +
queryInfo.getEndInstant());
+ return Pair.of(cloudObjectIncrCheckpoint, sourceData);
+ }
+ // Let's persist the dataset to avoid triggering the dag repeatedly
+ sourceData.persist(StorageLevel.MEMORY_AND_DISK());
+ // Set ordering in query to enable batching
+ Dataset<Row> orderedDf = QueryRunner.applyOrdering(sourceData,
queryInfo.getOrderByColumns());
+ Option<String> lastCheckpoint =
Option.of(cloudObjectIncrCheckpoint.getCommit());
+ Option<String> lastCheckpointKey =
Option.ofNullable(cloudObjectIncrCheckpoint.getKey());
+ Option<String> concatenatedKey = lastCheckpoint.flatMap(checkpoint ->
lastCheckpointKey.map(key -> checkpoint + key));
+
+ // Filter until last checkpoint key
+ if (concatenatedKey.isPresent()) {
+ orderedDf = orderedDf.withColumn("commit_key",
+ functions.concat(functions.col(queryInfo.getOrderColumn()),
functions.col(queryInfo.getKeyColumn())));
+ // Apply incremental filter
+ orderedDf =
orderedDf.filter(functions.col("commit_key").gt(concatenatedKey.get())).drop("commit_key");
Review Comment:
Creating and dropping temporary `commit_key` columns appears to be redundant:
`orderedDf = orderedDf.filter(
functions.concat(functions.col(queryInfo.getOrderColumn()),
functions.col(queryInfo.getKeyColumn()))
.gt(concatenatedKey.get())
);`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]