This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c6639fe2d46 [HUDI-6629] - Changes for s3/gcs IncrSource job to taken
into sourceLimit during ingestion (#9336)
c6639fe2d46 is described below
commit c6639fe2d46e7d41c1f2fad3ab3cc7847e92ddc9
Author: lokesh-lingarajan-0310
<[email protected]>
AuthorDate: Mon Aug 7 18:41:37 2023 -0700
[HUDI-6629] - Changes for s3/gcs IncrSource job to taken into sourceLimit
during ingestion (#9336)
- Change s3 and gcs incremental job to batch within a commit based on the
source limit
- Refactor s3 incr job to lend more testing
- Added test cases for both s3 and gcs incr jobs
- Checkpoint format => "commitTime#Key", sorted order of these columns will
help resume ingestion
- Added a few timeline apis to support fetching commit data from current
commit
---------
Co-authored-by: Lokesh Lingarajan
<[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../table/timeline/HoodieDefaultTimeline.java | 14 +
.../hudi/common/table/timeline/HoodieTimeline.java | 18 ++
.../table/timeline/TestHoodieActiveTimeline.java | 4 +
.../sources/GcsEventsHoodieIncrSource.java | 81 +++---
.../hudi/utilities/sources/HoodieIncrSource.java | 31 +-
.../sources/S3EventsHoodieIncrSource.java | 161 ++++++-----
...bjectDataFetcher.java => CloudDataFetcher.java} | 17 +-
.../sources/helpers/CloudObjectIncrCheckpoint.java | 70 +++++
.../sources/helpers/IncrSourceHelper.java | 144 ++++++++--
.../hudi/utilities/sources/helpers/QueryInfo.java | 112 ++++++++
.../utilities/sources/helpers/QueryRunner.java | 93 ++++++
.../utilities/sources/helpers/gcs/QueryInfo.java | 101 -------
.../sources/TestGcsEventsHoodieIncrSource.java | 155 +++++++++-
.../sources/TestS3EventsHoodieIncrSource.java | 320 +++++++++++++++++++++
.../sources/helpers/TestIncrSourceHelper.java | 249 ++++++++++++++++
15 files changed, 1292 insertions(+), 278 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index a5d56c91d5e..e504e401739 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
@@ -203,6 +204,12 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
getInstantsAsStream().filter(s ->
HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details);
}
+ @Override
+ public HoodieDefaultTimeline findInstantsInClosedRange(String startTs,
String endTs) {
+ return new HoodieDefaultTimeline(
+ instants.stream().filter(instant ->
HoodieTimeline.isInClosedRange(instant.getTimestamp(), startTs, endTs)),
details);
+ }
+
@Override
public HoodieDefaultTimeline findInstantsInRangeByStateTransitionTime(String
startTs, String endTs) {
return new HoodieDefaultTimeline(
@@ -244,6 +251,13 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
details);
}
+ @Override
+ public Option<HoodieInstant> findInstantBefore(String instantTime) {
+ return Option.fromJavaOptional(instants.stream()
+ .filter(instant -> compareTimestamps(instant.getTimestamp(),
LESSER_THAN, instantTime))
+ .max(Comparator.comparing(HoodieInstant::getTimestamp)));
+ }
+
@Override
public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) {
return new HoodieDefaultTimeline(getInstantsAsStream()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 291dc5c1e59..a1e70c2e22e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -241,6 +241,11 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline findInstantsInRange(String startTs, String endTs);
+ /**
+ * Create a new Timeline with instants after or equals startTs and before or
on endTs.
+ */
+ HoodieTimeline findInstantsInClosedRange(String startTs, String endTs);
+
/**`
* Create a new Timeline with instants after startTs and before or on endTs
* by state transition timestamp of actions.
@@ -267,6 +272,11 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline findInstantsBefore(String instantTime);
+ /**
+ * Finds the instant before specified time.
+ */
+ Option<HoodieInstant> findInstantBefore(String instantTime);
+
/**
* Create new timeline with all instants before or equals specified time.
*/
@@ -413,6 +423,14 @@ public interface HoodieTimeline extends Serializable {
&& HoodieTimeline.compareTimestamps(timestamp,
LESSER_THAN_OR_EQUALS, endTs);
}
+ /**
+ * Return true if specified timestamp is in range [startTs, endTs].
+ */
+ static boolean isInClosedRange(String timestamp, String startTs, String
endTs) {
+ return HoodieTimeline.compareTimestamps(timestamp, GREATER_THAN_OR_EQUALS,
startTs)
+ && HoodieTimeline.compareTimestamps(timestamp, LESSER_THAN_OR_EQUALS,
endTs);
+ }
+
static HoodieInstant getCompletedInstant(final HoodieInstant instant) {
return new HoodieInstant(State.COMPLETED, instant.getAction(),
instant.getTimestamp());
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 99525883dc3..06afc6fd5d3 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -171,6 +171,10 @@ public class TestHoodieActiveTimeline extends
HoodieCommonTestHarness {
timeline.getCommitTimeline().filterCompletedInstants().findInstantsInRange("04",
"11")
.getInstantsAsStream().map(HoodieInstant::getTimestamp),
"findInstantsInRange should return 4 instants");
+ assertStreamEquals(Stream.of("03", "05", "07", "09", "11"),
+
timeline.getCommitTimeline().filterCompletedInstants().findInstantsInClosedRange("03",
"11")
+ .getInstantsAsStream().map(HoodieInstant::getTimestamp),
+ "findInstantsInClosedRange should return 5 instants");
assertStreamEquals(Stream.of("09", "11"),
timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07",
2)
.getInstantsAsStream().map(HoodieInstant::getTimestamp),
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index 96d0464509e..d47f8420513 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -20,15 +20,19 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
-import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
-import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -46,7 +50,7 @@ import static
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_C
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
-import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
@@ -104,7 +108,11 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
private final MissingCheckpointStrategy missingCheckpointStrategy;
private final GcsObjectMetadataFetcher gcsObjectMetadataFetcher;
- private final GcsObjectDataFetcher gcsObjectDataFetcher;
+ private final CloudDataFetcher gcsObjectDataFetcher;
+ private final QueryRunner queryRunner;
+
+ public static final String GCS_OBJECT_KEY = "name";
+ public static final String GCS_OBJECT_SIZE = "size";
private static final Logger LOG =
LoggerFactory.getLogger(GcsEventsHoodieIncrSource.class);
@@ -113,12 +121,13 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
this(props, jsc, spark, schemaProvider,
new GcsObjectMetadataFetcher(props, getSourceFileFormat(props)),
- new GcsObjectDataFetcher(props, props.getString(DATAFILE_FORMAT.key(),
DATAFILE_FORMAT.defaultValue()))
+ new CloudDataFetcher(props, props.getString(DATAFILE_FORMAT.key(),
DATAFILE_FORMAT.defaultValue())),
+ new QueryRunner(spark, props)
);
}
GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc,
SparkSession spark,
- SchemaProvider schemaProvider,
GcsObjectMetadataFetcher gcsObjectMetadataFetcher, GcsObjectDataFetcher
gcsObjectDataFetcher) {
+ SchemaProvider schemaProvider,
GcsObjectMetadataFetcher gcsObjectMetadataFetcher, CloudDataFetcher
gcsObjectDataFetcher, QueryRunner queryRunner) {
super(props, jsc, spark, schemaProvider);
DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(HOODIE_SRC_BASE_PATH.key()));
@@ -129,6 +138,7 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
this.gcsObjectMetadataFetcher = gcsObjectMetadataFetcher;
this.gcsObjectDataFetcher = gcsObjectDataFetcher;
+ this.queryRunner = queryRunner;
LOG.info("srcPath: " + srcPath);
LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
@@ -137,59 +147,48 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
}
@Override
- public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
- QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCheckpoint, long sourceLimit) {
+ CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint =
CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
+ HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);
+
+ QueryInfo queryInfo = generateQueryInfo(
+ sparkContext, srcPath, numInstantsPerFetch,
+ Option.of(cloudObjectIncrCheckpoint.getCommit()),
+ missingCheckpointStrategy, handlingMode,
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ GCS_OBJECT_KEY, GCS_OBJECT_SIZE, true,
+ Option.ofNullable(cloudObjectIncrCheckpoint.getKey()));
+ LOG.info("Querying GCS with:" + cloudObjectIncrCheckpoint + " and
queryInfo:" + queryInfo);
- if (queryInfo.areStartAndEndInstantsEqual()) {
- LOG.info("Already caught up. Begin Checkpoint was: " +
queryInfo.getStartInstant());
+ if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) &&
queryInfo.areStartAndEndInstantsEqual()) {
+ LOG.info("Source of file names is empty. Returning empty result and
endInstant: "
+ + queryInfo.getStartInstant());
return Pair.of(Option.empty(), queryInfo.getStartInstant());
}
- Dataset<Row> cloudObjectMetadataDF =
queryInfo.initCloudObjectMetadata(srcPath, sparkSession);
-
+ Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo);
if (cloudObjectMetadataDF.isEmpty()) {
LOG.info("Source of file names is empty. Returning empty result and
endInstant: "
+ queryInfo.getEndInstant());
return Pair.of(Option.empty(), queryInfo.getEndInstant());
}
- return extractData(queryInfo, cloudObjectMetadataDF);
+ LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based
on sourceLimit :" + sourceLimit);
+ Pair<CloudObjectIncrCheckpoint, Dataset<Row>> checkPointAndDataset =
+ IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ cloudObjectMetadataDF, sourceLimit, queryInfo,
cloudObjectIncrCheckpoint);
+ LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
+
+ Pair<Option<Dataset<Row>>, String> extractedCheckPointAndDataset =
extractData(queryInfo, checkPointAndDataset.getRight());
+ return Pair.of(extractedCheckPointAndDataset.getLeft(),
checkPointAndDataset.getLeft().toString());
}
private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo,
Dataset<Row> cloudObjectMetadataDF) {
List<CloudObjectMetadata> cloudObjectMetadata =
gcsObjectMetadataFetcher.getGcsObjectMetadata(sparkContext,
cloudObjectMetadataDF, checkIfFileExists);
+ LOG.info("Total number of files to process :" +
cloudObjectMetadata.size());
Option<Dataset<Row>> fileDataRows =
gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata,
props);
return Pair.of(fileDataRows, queryInfo.getEndInstant());
}
- private QueryInfo getQueryInfo(Option<String> lastCkptStr) {
- Option<String> beginInstant = getBeginInstant(lastCkptStr);
-
- HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);
- Pair<String, Pair<String, String>> queryInfoPair =
calculateBeginAndEndInstants(
- sparkContext, srcPath, numInstantsPerFetch, beginInstant,
missingCheckpointStrategy, handlingMode);
-
- QueryInfo queryInfo = new QueryInfo(
- queryInfoPair.getLeft(),
- queryInfoPair.getRight().getLeft(),
- queryInfoPair.getRight().getRight(),
- handlingMode);
-
- if (LOG.isDebugEnabled()) {
- queryInfo.logDetails();
- }
-
- return queryInfo;
- }
-
- private Option<String> getBeginInstant(Option<String> lastCheckpoint) {
- if (lastCheckpoint.isPresent() && !isNullOrEmpty(lastCheckpoint.get())) {
- return lastCheckpoint;
- }
-
- return Option.empty();
- }
-
private static String getSourceFileFormat(TypedProperties props) {
return props.getString(SOURCE_FILE_FORMAT.key(),
SOURCE_FILE_FORMAT.defaultValue());
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 7791273ae22..4836aeb34cc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -45,7 +46,7 @@ import static
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLL
import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
-import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
public class HoodieIncrSource extends RowSource {
@@ -58,7 +59,7 @@ public class HoodieIncrSource extends RowSource {
* {@link #HOODIE_SRC_BASE_PATH} is the base-path for the source Hoodie
table.
*/
@Deprecated
- static final String HOODIE_SRC_BASE_PATH =
HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key();
+ public static final String HOODIE_SRC_BASE_PATH =
HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key();
/**
* {@link #NUM_INSTANTS_PER_FETCH} allows the max number of instants whose
changes can be incrementally fetched.
@@ -154,21 +155,23 @@ public class HoodieIncrSource extends RowSource {
lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty()
: lastCkptStr : Option.empty();
HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);
- Pair<String, Pair<String, String>> queryTypeAndInstantEndpts =
calculateBeginAndEndInstants(sparkContext, srcPath,
- numInstantsPerFetch, beginInstant, missingCheckpointStrategy,
handlingMode);
-
- if
(queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue()))
{
- LOG.warn("Already caught up. Begin Checkpoint was :" +
queryTypeAndInstantEndpts.getValue().getKey());
- return Pair.of(Option.empty(),
queryTypeAndInstantEndpts.getValue().getKey());
+ QueryInfo queryInfo = generateQueryInfo(sparkContext, srcPath,
+ numInstantsPerFetch, beginInstant, missingCheckpointStrategy,
handlingMode,
+ HoodieRecord.COMMIT_TIME_METADATA_FIELD,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
+ null, false, Option.empty());
+
+ if (queryInfo.areStartAndEndInstantsEqual()) {
+ LOG.info("Already caught up. No new data to process");
+ return Pair.of(Option.empty(), queryInfo.getEndInstant());
}
Dataset<Row> source;
// Do Incr pull. Set end instant if available
- if
(queryTypeAndInstantEndpts.getKey().equals(QUERY_TYPE_INCREMENTAL_OPT_VAL())) {
+ if (queryInfo.isIncremental()) {
source = sparkSession.read().format("org.apache.hudi")
.option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
- .option(BEGIN_INSTANTTIME().key(),
queryTypeAndInstantEndpts.getValue().getLeft())
- .option(END_INSTANTTIME().key(),
queryTypeAndInstantEndpts.getValue().getRight())
+ .option(BEGIN_INSTANTTIME().key(), queryInfo.getStartInstant())
+ .option(END_INSTANTTIME().key(), queryInfo.getEndInstant())
.option(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
props.getString(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()))
@@ -181,9 +184,9 @@ public class HoodieIncrSource extends RowSource {
.load(srcPath)
// add filtering so that only interested records are returned.
.filter(String.format("%s > '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- queryTypeAndInstantEndpts.getRight().getLeft()))
+ queryInfo.getStartInstant()))
.filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- queryTypeAndInstantEndpts.getRight().getRight()));
+ queryInfo.getEndInstant()));
}
HoodieRecord.HoodieRecordType recordType =
createRecordMerger(props).getRecordType();
@@ -199,6 +202,6 @@ public class HoodieIncrSource extends RowSource {
String[] colsToDrop = shouldDropMetaFields ?
HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x ->
!x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
final Dataset<Row> src = source.drop(colsToDrop);
- return Pair.of(Option.of(src),
queryTypeAndInstantEndpts.getRight().getRight());
+ return Pair.of(Option.of(src), queryInfo.getEndInstant());
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 99d8ae36951..40abe47189d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -26,11 +26,14 @@ import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -43,20 +46,15 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
-import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
-import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
-import static
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT;
-import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
-import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
-import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
-import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition;
-import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.loadAsDataset;
-import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static
org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.DATAFILE_FORMAT;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
/**
* This source will use the S3 events meta information from hoodie table
generate by {@link S3EventsSource}.
@@ -64,6 +62,13 @@ import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHoll
public class S3EventsHoodieIncrSource extends HoodieIncrSource {
private static final Logger LOG =
LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
+ private final String srcPath;
+ private final int numInstantsPerFetch;
+ private final boolean checkIfFileExists;
+ private final String fileFormat;
+ private final IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy;
+ private final QueryRunner queryRunner;
+ private final CloudDataFetcher cloudDataFetcher;
public static class Config {
// control whether we do existence check for files before consuming them
@@ -93,94 +98,100 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
public static final String SPARK_DATASOURCE_OPTIONS =
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS.key();
}
+ public static final String S3_OBJECT_KEY = "s3.object.key";
+ public static final String S3_OBJECT_SIZE = "s3.object.size";
+ public static final String S3_BUCKET_NAME = "s3.bucket.name";
+
public S3EventsHoodieIncrSource(
TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider) {
- super(props, sparkContext, sparkSession, schemaProvider);
+ this(props, sparkContext, sparkSession, schemaProvider, new
QueryRunner(sparkSession, props),
+ new CloudDataFetcher(props, props.getString(DATAFILE_FORMAT,
DEFAULT_SOURCE_FILE_FORMAT)));
}
- @Override
- public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
+ public S3EventsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ SchemaProvider schemaProvider,
+ QueryRunner queryRunner,
+ CloudDataFetcher cloudDataFetcher) {
+ super(props, sparkContext, sparkSession, schemaProvider);
DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(HOODIE_SRC_BASE_PATH.key()));
- String srcPath = props.getString(HOODIE_SRC_BASE_PATH.key());
- int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH.key(),
NUM_INSTANTS_PER_FETCH.defaultValue());
- boolean readLatestOnMissingCkpt = props.getBoolean(
- READ_LATEST_INSTANT_ON_MISSING_CKPT.key(),
READ_LATEST_INSTANT_ON_MISSING_CKPT.defaultValue());
- IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy =
(props.containsKey(HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY.key()))
- ?
IncrSourceHelper.MissingCheckpointStrategy.valueOf(props.getString(HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY.key()))
: null;
- if (readLatestOnMissingCkpt) {
- missingCheckpointStrategy =
IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
- }
- String fileFormat = props.getString(SOURCE_FILE_FORMAT.key(),
SOURCE_FILE_FORMAT.defaultValue());
-
- // Use begin Instant if set and non-empty
- Option<String> beginInstant =
- lastCkptStr.isPresent()
- ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr
- : Option.empty();
+ this.srcPath = props.getString(HOODIE_SRC_BASE_PATH.key());
+ this.numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH.key(),
NUM_INSTANTS_PER_FETCH.defaultValue());
+ this.checkIfFileExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK,
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+ this.fileFormat = props.getString(SOURCE_FILE_FORMAT.key(),
SOURCE_FILE_FORMAT.defaultValue());
+ this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+ this.queryRunner = queryRunner;
+ this.cloudDataFetcher = cloudDataFetcher;
+ }
+ @Override
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCheckpoint, long sourceLimit) {
+ CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint =
CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);
- Pair<String, Pair<String, String>> queryTypeAndInstantEndpts =
calculateBeginAndEndInstants(sparkContext, srcPath,
- numInstantsPerFetch, beginInstant, missingCheckpointStrategy,
handlingMode);
-
- if
(queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue()))
{
- LOG.warn("Already caught up. Begin Checkpoint was :" +
queryTypeAndInstantEndpts.getValue().getKey());
- return Pair.of(Option.empty(),
queryTypeAndInstantEndpts.getValue().getKey());
- }
-
- Dataset<Row> source;
- // Do incremental pull. Set end instant if available.
- if
(queryTypeAndInstantEndpts.getKey().equals(QUERY_TYPE_INCREMENTAL_OPT_VAL())) {
- source = sparkSession.read().format("org.apache.hudi")
- .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
- .option(BEGIN_INSTANTTIME().key(),
queryTypeAndInstantEndpts.getRight().getLeft())
- .option(END_INSTANTTIME().key(),
queryTypeAndInstantEndpts.getRight().getRight())
- .option(INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key(),
handlingMode.name())
- .load(srcPath);
- } else {
- // if checkpoint is missing from source table, and if strategy is set to
READ_UPTO_LATEST_COMMIT, we have to issue snapshot query
- source = sparkSession.read().format("org.apache.hudi")
- .option(QUERY_TYPE().key(),
QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath)
- // add filtering so that only interested records are returned.
- .filter(String.format("%s > '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- queryTypeAndInstantEndpts.getRight().getLeft()))
- .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- queryTypeAndInstantEndpts.getRight().getRight()));
+ QueryInfo queryInfo =
+ IncrSourceHelper.generateQueryInfo(
+ sparkContext, srcPath, numInstantsPerFetch,
+ Option.of(cloudObjectIncrCheckpoint.getCommit()),
+ missingCheckpointStrategy, handlingMode,
+ HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ S3_OBJECT_KEY, S3_OBJECT_SIZE, true,
+ Option.ofNullable(cloudObjectIncrCheckpoint.getKey()));
+ LOG.info("Querying S3 with:" + cloudObjectIncrCheckpoint + ", queryInfo:"
+ queryInfo);
+
+ if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) &&
queryInfo.areStartAndEndInstantsEqual()) {
+ LOG.warn("Already caught up. No new data to process");
+ return Pair.of(Option.empty(), queryInfo.getEndInstant());
}
+ Dataset<Row> source = queryRunner.run(queryInfo);
if (source.isEmpty()) {
- return Pair.of(Option.empty(),
queryTypeAndInstantEndpts.getRight().getRight());
+ LOG.info("Source of file names is empty. Returning empty result and
endInstant: "
+ + queryInfo.getEndInstant());
+ return Pair.of(Option.empty(), queryInfo.getEndInstant());
}
- String filter = "s3.object.size > 0";
- if
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key(),
null))) {
- filter = filter + " and s3.object.key like '" +
props.getString(S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key()) + "%'";
- }
- if
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key(),
null))) {
- filter = filter + " and s3.object.key not like '" +
props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key()) +
"%'";
- }
- if
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key(),
null))) {
- filter = filter + " and s3.object.key not like '%" +
props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key()) +
"%'";
- }
- // add file format filtering by default
- filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
+ Dataset<Row> filteredSourceData = applyFilter(source, fileFormat);
+
+ LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based
on sourceLimit :" + sourceLimit);
+ Pair<CloudObjectIncrCheckpoint, Dataset<Row>> checkPointAndDataset =
+ IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ filteredSourceData, sourceLimit, queryInfo,
cloudObjectIncrCheckpoint);
+ LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
String s3FS =
props.getString(S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX.key(),
S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX.defaultValue()).toLowerCase();
String s3Prefix = s3FS + "://";
// Create S3 paths
- final boolean checkExists =
props.getBoolean(S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK.key(),
S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK.defaultValue());
SerializableConfiguration serializableHadoopConf = new
SerializableConfiguration(sparkContext.hadoopConfiguration());
- List<CloudObjectMetadata> cloudObjectMetadata = source
- .filter(filter)
- .select("s3.bucket.name", "s3.object.key", "s3.object.size")
+ List<CloudObjectMetadata> cloudObjectMetadata =
checkPointAndDataset.getRight()
+ .select(S3_BUCKET_NAME, S3_OBJECT_KEY, S3_OBJECT_SIZE)
.distinct()
- .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix,
serializableHadoopConf, checkExists), Encoders.kryo(CloudObjectMetadata.class))
+ .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix,
serializableHadoopConf, checkIfFileExists),
Encoders.kryo(CloudObjectMetadata.class))
.collectAsList();
+ LOG.info("Total number of files to process :" +
cloudObjectMetadata.size());
+
+ Option<Dataset<Row>> datasetOption =
cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props);
+ return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
+ }
- Option<Dataset<Row>> datasetOption = loadAsDataset(sparkSession,
cloudObjectMetadata, props, fileFormat);
- return Pair.of(datasetOption,
queryTypeAndInstantEndpts.getRight().getRight());
+ Dataset<Row> applyFilter(Dataset<Row> source, String fileFormat) {
+ String filter = S3_OBJECT_SIZE + " > 0";
+ if
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key(),
null))) {
+ filter = filter + " and " + S3_OBJECT_KEY + " like '" +
props.getString(S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key()) + "%'";
+ }
+ if
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key(),
null))) {
+ filter = filter + " and " + S3_OBJECT_KEY + " not like '" +
props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key()) +
"%'";
+ }
+ if
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key(),
null))) {
+ filter = filter + " and " + S3_OBJECT_KEY + " not like '%" +
props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key()) +
"%'";
+ }
+ // add file format filtering by default
+ filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + "%'";
+ return source.filter(filter);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectDataFetcher.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
similarity index 75%
rename from
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectDataFetcher.java
rename to
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index 26005d74f31..dfa6c68ec6f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectDataFetcher.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -16,17 +16,16 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities.sources.helpers.gcs;
+package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.List;
@@ -34,19 +33,19 @@ import java.util.List;
import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.loadAsDataset;
/**
- * Connects to GCS from Spark and downloads data from a given list of files.
- * Assumes SparkContext is already configured with GCS options through
GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ * Connects to S3/GCS from Spark and downloads data from a given list of files.
+ * Assumes SparkContext is already configured.
*/
-public class GcsObjectDataFetcher implements Serializable {
+public class CloudDataFetcher implements Serializable {
private final String fileFormat;
private TypedProperties props;
- private static final Logger LOG =
LoggerFactory.getLogger(GcsObjectDataFetcher.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(CloudDataFetcher.class);
private static final long serialVersionUID = 1L;
- public GcsObjectDataFetcher(TypedProperties props, String fileFormat) {
+ public CloudDataFetcher(TypedProperties props, String fileFormat) {
this.fileFormat = fileFormat;
this.props = props;
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectIncrCheckpoint.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectIncrCheckpoint.java
new file mode 100644
index 00000000000..cafef4aa145
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectIncrCheckpoint.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.util.Option;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.DEFAULT_BEGIN_TIMESTAMP;
+
+/**
+ * This POJO is used to craft checkpoints that supports size based batching
+ * This object will be use by object based Hudi incr sources (s3/gcs)
+ */
+public class CloudObjectIncrCheckpoint {
+
+ private final String commit;
+ private final String key;
+
+ public CloudObjectIncrCheckpoint(String commit, String key) {
+ this.commit = commit;
+ this.key = key;
+ }
+
+ public String getCommit() {
+ return commit;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public static CloudObjectIncrCheckpoint fromString(Option<String>
lastCheckpoint) {
+ if (lastCheckpoint.isPresent()) {
+ Option<String[]> splitResult = lastCheckpoint.map(str -> str.split("#",
2));
+ if (splitResult.isPresent() && splitResult.get().length == 2) {
+ String[] split = splitResult.get();
+ return new CloudObjectIncrCheckpoint(split[0], split[1]);
+ } else {
+ return new CloudObjectIncrCheckpoint(lastCheckpoint.get(), null);
+ }
+ }
+ return new CloudObjectIncrCheckpoint(DEFAULT_BEGIN_TIMESTAMP, null);
+ }
+
+ @Override
+ public String toString() {
+ if (isNullOrEmpty(commit) && isNullOrEmpty(key)) {
+ return DEFAULT_BEGIN_TIMESTAMP;
+ } else if (isNullOrEmpty(key)) {
+ return commit;
+ }
+ return commit + "#" + key;
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index 9097dc3dd6f..73b7afbf753 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -31,19 +31,29 @@ import
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.expressions.Window;
+import org.apache.spark.sql.expressions.WindowSpec;
+import org.apache.spark.sql.functions;
+import org.apache.spark.storage.StorageLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Objects;
import java.util.function.Function;
import static
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT;
import static
org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.sum;
public class IncrSourceHelper {
- private static final String DEFAULT_BEGIN_TIMESTAMP = "000";
+ private static final Logger LOG =
LoggerFactory.getLogger(IncrSourceHelper.class);
+ public static final String DEFAULT_BEGIN_TIMESTAMP =
HoodieTimeline.INIT_INSTANT_TS;
+ private static final String CUMULATIVE_COLUMN_NAME = "cumulativeSize";
/**
* Get a timestamp which is the next value in a descending sequence.
@@ -74,20 +84,31 @@ public class IncrSourceHelper {
/**
* Find begin and end instants to be set for the next fetch.
*
- * @param jssc Java Spark Context
- * @param srcBasePath Base path of Hudi source table
- * @param numInstantsPerFetch Max Instants per fetch
- * @param beginInstant Last Checkpoint String
+ * @param jssc Java Spark Context
+ * @param srcBasePath Base path of Hudi source table
+ * @param numInstantsPerFetch Max Instants per fetch
+ * @param beginInstant Last Checkpoint String
* @param missingCheckpointStrategy when begin instant is missing, allow
reading based on missing checkpoint strategy
- * @return begin and end instants along with query type.
+ * @param handlingMode Hollow Commit Handling Mode
+ * @param orderColumn Column to order by (used for size based
incr source)
+ * @param keyColumn Key column (used for size based incr
source)
+ * @param limitColumn Limit column (used for size based incr
source)
+ * @param sourceLimitBasedBatching When sourceLimit based batching is used,
we need to fetch the current commit as well,
+ * this flag is used to indicate that.
+ * @param lastCheckpointKey Last checkpoint key (used in the upgrade
code path)
+ * @return begin and end instants along with query type and other
information.
*/
- public static Pair<String, Pair<String, String>>
calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath,
-
int numInstantsPerFetch, Option<String> beginInstant,
-
MissingCheckpointStrategy missingCheckpointStrategy,
-
HollowCommitHandling handlingMode) {
+ public static QueryInfo generateQueryInfo(JavaSparkContext jssc, String
srcBasePath,
+ int numInstantsPerFetch,
Option<String> beginInstant,
+ MissingCheckpointStrategy
missingCheckpointStrategy,
+ HollowCommitHandling handlingMode,
+ String orderColumn, String
keyColumn, String limitColumn,
+ boolean sourceLimitBasedBatching,
+ Option<String> lastCheckpointKey) {
ValidationUtils.checkArgument(numInstantsPerFetch > 0,
"Make sure the config
hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive
value");
HoodieTableMetaClient srcMetaClient =
HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
+
HoodieTimeline completedCommitTimeline =
srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
final HoodieTimeline activeCommitTimeline =
handleHollowCommitIfNeeded(completedCommitTimeline, srcMetaClient,
handlingMode);
Function<HoodieInstant, String> timestampForLastInstant = instant ->
handlingMode == HollowCommitHandling.USE_TRANSITION_TIME
@@ -106,45 +127,108 @@ public class IncrSourceHelper {
}
});
+ String previousInstantTime = beginInstantTime;
+ if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
+ Option<HoodieInstant> previousInstant =
activeCommitTimeline.findInstantBefore(beginInstantTime);
+ if (previousInstant.isPresent()) {
+ previousInstantTime = previousInstant.get().getTimestamp();
+ }
+ }
+
if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST ||
!activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) {
- Option<HoodieInstant> nthInstant =
Option.fromJavaOptional(activeCommitTimeline
- .findInstantsAfter(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
- return Pair.of(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(),
Pair.of(beginInstantTime,
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)));
+ Option<HoodieInstant> nthInstant;
+ // When we are in the upgrade code path from non-sourcelimit-based
batching to sourcelimit-based batching, we need to avoid fetching the commit
+ // that is read already. Else we will have duplicates in append-only use
case if we use "findInstantsAfterOrEquals".
+ // As soon as we have a new format of checkpoint and a key we will move
to the new code of fetching the current commit as well.
+ if (sourceLimitBasedBatching && lastCheckpointKey.isPresent()) {
+ nthInstant = Option.fromJavaOptional(activeCommitTimeline
+ .findInstantsAfterOrEquals(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+ } else {
+ nthInstant = Option.fromJavaOptional(activeCommitTimeline
+ .findInstantsAfter(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+ }
+ return new
QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(),
previousInstantTime,
+ beginInstantTime,
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime),
+ orderColumn, keyColumn, limitColumn);
} else {
// when MissingCheckpointStrategy is set to read everything until
latest, trigger snapshot query.
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
- return Pair.of(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(),
Pair.of(beginInstantTime, timestampForLastInstant.apply(lastInstant.get())));
+ return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(),
+ previousInstantTime, beginInstantTime,
lastInstant.get().getTimestamp(),
+ orderColumn, keyColumn, limitColumn);
}
}
/**
- * Validate instant time seen in the incoming row.
+ * Adjust the source dataset to size based batch based on last checkpoint
key.
*
- * @param row Input Row
- * @param instantTime Hoodie Instant time of the row
- * @param sinceInstant begin instant of the batch
- * @param endInstant end instant of the batch
+ * @param sourceData Source dataset
+ * @param sourceLimit Max number of bytes to be read from source
+ * @param queryInfo Query Info
+ * @return end instants along with filtered rows.
*/
- public static void validateInstantTime(Row row, String instantTime, String
sinceInstant, String endInstant) {
- Objects.requireNonNull(instantTime);
-
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime,
HoodieTimeline.GREATER_THAN, sinceInstant),
- "Instant time(_hoodie_commit_time) in row (" + row + ") was : " +
instantTime + "but expected to be between "
- + sinceInstant + "(excl) - " + endInstant + "(incl)");
- ValidationUtils.checkArgument(
- HoodieTimeline.compareTimestamps(instantTime,
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant),
- "Instant time(_hoodie_commit_time) in row (" + row + ") was : " +
instantTime + "but expected to be between "
- + sinceInstant + "(excl) - " + endInstant + "(incl)");
+ public static Pair<CloudObjectIncrCheckpoint, Dataset<Row>>
filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData,
+
long sourceLimit, QueryInfo queryInfo,
+
CloudObjectIncrCheckpoint
cloudObjectIncrCheckpoint) {
+ if (sourceData.isEmpty()) {
+ LOG.info("Empty source, returning endpoint:" +
queryInfo.getEndInstant());
+ return Pair.of(cloudObjectIncrCheckpoint, sourceData);
+ }
+ // Let's persist the dataset to avoid triggering the dag repeatedly
+ sourceData.persist(StorageLevel.MEMORY_AND_DISK());
+ // Set ordering in query to enable batching
+ Dataset<Row> orderedDf = QueryRunner.applyOrdering(sourceData,
queryInfo.getOrderByColumns());
+ Option<String> lastCheckpoint =
Option.of(cloudObjectIncrCheckpoint.getCommit());
+ Option<String> lastCheckpointKey =
Option.ofNullable(cloudObjectIncrCheckpoint.getKey());
+ Option<String> concatenatedKey = lastCheckpoint.flatMap(checkpoint ->
lastCheckpointKey.map(key -> checkpoint + key));
+
+ // Filter until last checkpoint key
+ if (concatenatedKey.isPresent()) {
+ orderedDf = orderedDf.withColumn("commit_key",
+ functions.concat(functions.col(queryInfo.getOrderColumn()),
functions.col(queryInfo.getKeyColumn())));
+ // Apply incremental filter
+ orderedDf =
orderedDf.filter(functions.col("commit_key").gt(concatenatedKey.get())).drop("commit_key");
+ // We could be just at the end of the commit, so return empty
+ if (orderedDf.isEmpty()) {
+ LOG.info("Empty ordered source, returning endpoint:" +
queryInfo.getEndInstant());
+ sourceData.unpersist();
+ return Pair.of(new
CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()),
orderedDf);
+ }
+ }
+
+ // Limit based on sourceLimit
+ WindowSpec windowSpec = Window.orderBy(col(queryInfo.getOrderColumn()),
col(queryInfo.getKeyColumn()));
+ // Add the 'cumulativeSize' column with running sum of 'limitColumn'
+ Dataset<Row> aggregatedData = orderedDf.withColumn(CUMULATIVE_COLUMN_NAME,
+ sum(col(queryInfo.getLimitColumn())).over(windowSpec));
+ Dataset<Row> collectedRows =
aggregatedData.filter(col(CUMULATIVE_COLUMN_NAME).leq(sourceLimit));
+
+ Row row = null;
+ if (collectedRows.isEmpty()) {
+ // If the first element itself exceeds limits then return first element
+ LOG.info("First object exceeding source limit: " + sourceLimit + "
bytes");
+ row = aggregatedData.select(queryInfo.getOrderColumn(),
queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).first();
+ collectedRows = aggregatedData.limit(1);
+ } else {
+ // Get the last row and form composite key
+ row = collectedRows.select(queryInfo.getOrderColumn(),
queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).orderBy(
+ col(queryInfo.getOrderColumn()).desc(),
col(queryInfo.getKeyColumn()).desc()).first();
+ }
+ LOG.info("Processed batch size: " + row.getLong(2) + " bytes");
+ sourceData.unpersist();
+ return Pair.of(new CloudObjectIncrCheckpoint(row.getString(0),
row.getString(1)), collectedRows);
}
/**
* Determine the policy to choose if a checkpoint is missing (detected by
the absence of a beginInstant),
* during a run of a {@link HoodieIncrSource}.
+ *
* @param props the usual Hudi props object
* @return
*/
public static MissingCheckpointStrategy
getMissingCheckpointStrategy(TypedProperties props) {
boolean readLatestOnMissingCkpt = props.getBoolean(
- READ_LATEST_INSTANT_ON_MISSING_CKPT.key(),
READ_LATEST_INSTANT_ON_MISSING_CKPT.defaultValue());
+ READ_LATEST_INSTANT_ON_MISSING_CKPT.key(),
READ_LATEST_INSTANT_ON_MISSING_CKPT.defaultValue());
if (readLatestOnMissingCkpt) {
return MissingCheckpointStrategy.READ_LATEST;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java
new file mode 100644
index 00000000000..4e4ee275829
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+
+/**
+ * This class is used to prepare query information for s3 and gcs incr source.
+ * Some of the information in this class is used for batching based on
sourceLimit.
+ */
+public class QueryInfo {
+ private final String queryType;
+ private final String previousInstant;
+ private final String startInstant;
+ private final String endInstant;
+ private final String orderColumn;
+ private final String keyColumn;
+ private final String limitColumn;
+ private final List<String> orderByColumns;
+
+ public QueryInfo(
+ String queryType, String previousInstant,
+ String startInstant, String endInstant,
+ String orderColumn, String keyColumn,
+ String limitColumn) {
+ this.queryType = queryType;
+ this.previousInstant = previousInstant;
+ this.startInstant = startInstant;
+ this.endInstant = endInstant;
+ this.orderColumn = orderColumn;
+ this.keyColumn = keyColumn;
+ this.limitColumn = limitColumn;
+ this.orderByColumns = Arrays.asList(orderColumn, keyColumn);
+ }
+
+ public boolean areStartAndEndInstantsEqual() {
+ return getStartInstant().equals(getEndInstant());
+ }
+
+ public boolean isIncremental() {
+ return QUERY_TYPE_INCREMENTAL_OPT_VAL().equals(queryType);
+ }
+
+ public boolean isSnapshot() {
+ return QUERY_TYPE_SNAPSHOT_OPT_VAL().equals(queryType);
+ }
+
+ public String getQueryType() {
+ return queryType;
+ }
+
+ public String getPreviousInstant() {
+ return previousInstant;
+ }
+
+ public String getStartInstant() {
+ return startInstant;
+ }
+
+ public String getEndInstant() {
+ return endInstant;
+ }
+
+ public String getOrderColumn() {
+ return orderColumn;
+ }
+
+ public String getKeyColumn() {
+ return keyColumn;
+ }
+
+ public String getLimitColumn() {
+ return limitColumn;
+ }
+
+ public List<String> getOrderByColumns() {
+ return orderByColumns;
+ }
+
+ @Override
+ public String toString() {
+ return ("Query information for Incremental Source "
+ + "queryType: " + queryType
+ + ", previousInstant: " + previousInstant
+ + ", startInstant: " + startInstant
+ + ", endInstant: " + endInstant
+ + ", orderColumn: " + orderColumn
+ + ", keyColumn: " + keyColumn
+ + ", limitColumn: " + limitColumn
+ + ", orderByColumns: " + orderByColumns);
+ }
+}
\ No newline at end of file
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
new file mode 100644
index 00000000000..06f082aff7a
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class is currently used only by s3 and gcs incr sources that supports
size based batching
+ * This class will fetch comitted files from the current commit to support
size based batching.
+ */
+public class QueryRunner {
+ private final SparkSession sparkSession;
+ private final String sourcePath;
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);
+
+ public QueryRunner(SparkSession sparkSession, TypedProperties props) {
+ this.sparkSession = sparkSession;
+ DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key()));
+ this.sourcePath =
props.getString(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key());
+ }
+
+ public Dataset<Row> run(QueryInfo queryInfo) {
+ Dataset<Row> dataset = null;
+ if (queryInfo.isIncremental()) {
+ dataset = runIncrementalQuery(queryInfo);
+ } else if (queryInfo.isSnapshot()) {
+ dataset = runSnapshotQuery(queryInfo);
+ } else {
+ throw new HoodieException("Unknown query type " +
queryInfo.getQueryType());
+ }
+ return dataset;
+ }
+
+ public static Dataset<Row> applyOrdering(Dataset<Row> dataset, List<String>
orderByColumns) {
+ if (orderByColumns != null && !orderByColumns.isEmpty()) {
+ LOG.debug("Applying ordering " + orderByColumns);
+ return
dataset.orderBy(orderByColumns.stream().map(functions::col).toArray(Column[]::new));
+ }
+ return dataset;
+ }
+
+ public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) {
+ LOG.info("Running incremental query");
+ return sparkSession.read().format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE().key(),
queryInfo.getQueryType())
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(),
queryInfo.getPreviousInstant())
+ .option(DataSourceReadOptions.END_INSTANTTIME().key(),
queryInfo.getEndInstant()).load(sourcePath);
+ }
+
+ public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {
+ LOG.info("Running snapshot query");
+ return sparkSession.read().format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE().key(),
queryInfo.getQueryType()).load(sourcePath)
+ // add filtering so that only interested records are returned.
+ .filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ queryInfo.getStartInstant()))
+ .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ queryInfo.getEndInstant()));
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java
deleted file mode 100644
index ed172da8f29..00000000000
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.sources.helpers.gcs;
-
-import org.apache.hudi.common.model.HoodieRecord;
-import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
-
-import org.apache.spark.sql.DataFrameReader;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
-import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
-import static
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT;
-import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
-import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
-import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
-
-/**
- * Uses the start and end instants of a Hudi Streamer Source to help construct
the right kind
- * of query for subsequent requests.
- */
-public class QueryInfo {
-
- private final String queryType;
- private final String startInstant;
- private final String endInstant;
- private final HollowCommitHandling handlingMode;
- private static final Logger LOG = LoggerFactory.getLogger(QueryInfo.class);
-
- public QueryInfo(String queryType, String startInstant, String endInstant,
HollowCommitHandling handlingMode) {
- this.queryType = queryType;
- this.startInstant = startInstant;
- this.endInstant = endInstant;
- this.handlingMode = handlingMode;
- }
-
- public Dataset<Row> initCloudObjectMetadata(String srcPath, SparkSession
sparkSession) {
- if (isIncremental()) {
- return incrementalQuery(sparkSession).load(srcPath);
- }
-
- // Issue a snapshot query.
- return snapshotQuery(sparkSession).load(srcPath)
- .filter(String.format("%s > '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD, getStartInstant()))
- .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD, getEndInstant()));
- }
-
- public boolean areStartAndEndInstantsEqual() {
- return getStartInstant().equals(getEndInstant());
- }
-
- private DataFrameReader snapshotQuery(SparkSession sparkSession) {
- return sparkSession.read().format("org.apache.hudi")
- .option(QUERY_TYPE().key(), QUERY_TYPE_SNAPSHOT_OPT_VAL());
- }
-
- private DataFrameReader incrementalQuery(SparkSession sparkSession) {
- return sparkSession.read().format("org.apache.hudi")
- .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
- .option(BEGIN_INSTANTTIME().key(), getStartInstant())
- .option(END_INSTANTTIME().key(), getEndInstant())
- .option(INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key(),
handlingMode.name());
- }
-
- public boolean isIncremental() {
- return QUERY_TYPE_INCREMENTAL_OPT_VAL().equals(queryType);
- }
-
- public String getStartInstant() {
- return startInstant;
- }
-
- public String getEndInstant() {
- return endInstant;
- }
-
- public void logDetails() {
- LOG.debug("queryType: " + queryType + ", startInstant: " + startInstant +
", endInstant: " + endInstant);
- }
-
-}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index f1701654212..9414bbec4fd 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -31,6 +31,7 @@ import
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -38,13 +39,17 @@ import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
-import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
@@ -62,9 +67,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
@@ -81,6 +90,8 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
private static final Schema GCS_METADATA_SCHEMA =
SchemaTestUtil.getSchemaFromResource(
TestGcsEventsHoodieIncrSource.class,
"/streamer-config/gcs-metadata.avsc", true);
+ private ObjectMapper mapper = new ObjectMapper();
+
@TempDir
protected java.nio.file.Path tempDir;
@@ -88,16 +99,21 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
GcsObjectMetadataFetcher gcsObjectMetadataFetcher;
@Mock
- GcsObjectDataFetcher gcsObjectDataFetcher;
+ CloudDataFetcher gcsObjectDataFetcher;
+
+ @Mock
+ QueryRunner queryRunner;
protected FilebasedSchemaProvider schemaProvider;
private HoodieTableMetaClient metaClient;
+ private JavaSparkContext jsc;
private static final Logger LOG =
LoggerFactory.getLogger(TestGcsEventsHoodieIncrSource.class);
@BeforeEach
public void setUp() throws IOException {
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+ jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
MockitoAnnotations.initMocks(this);
}
@@ -113,7 +129,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
Pair<String, List<HoodieRecord>> inserts =
writeGcsMetadataRecords(commitTimeForWrites);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 0,
inserts.getKey());
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, 0, inserts.getKey());
verify(gcsObjectMetadataFetcher,
times(0)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
anyBoolean());
@@ -143,10 +159,17 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
DataTypes.createStructField("text", DataTypes.StringType, true)
});
Dataset<Row> rows = spark().createDataFrame(recs, schema);
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(),
eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows));
+ when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 4,
inserts.getKey());
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, 4, "1#path/to/file1.json");
verify(gcsObjectMetadataFetcher,
times(1)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
anyBoolean());
@@ -154,14 +177,103 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
eq(cloudObjectMetadataList), Mockito.any());
}
+ @Test
+ public void testTwoFilesAndContinueInSameCommit() throws IOException {
+ String commitTimeForWrites = "2";
+ String commitTimeForReads = "1";
+
+ Pair<String, List<HoodieRecord>> inserts =
writeGcsMetadataRecords(commitTimeForWrites);
+ List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList(
+ new CloudObjectMetadata("data-file-1.json", 1),
+ new CloudObjectMetadata("data-file-2.json", 1));
+ when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(),
Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList);
+
+ List<Row> recs = Arrays.asList(
+ new GenericRow(new String[] {"1", "Hello 1"}),
+ new GenericRow(new String[] {"2", "Hello 2"}),
+ new GenericRow(new String[] {"3", "Hello 3"}),
+ new GenericRow(new String[] {"4", "Hello 4"})
+ );
+ StructType schema = new StructType(new StructField[] {
+ DataTypes.createStructField("id", DataTypes.StringType, true),
+ DataTypes.createStructField("text", DataTypes.StringType, true)
+ });
+ Dataset<Row> rows = spark().createDataFrame(recs, schema);
+
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(),
eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows));
+ when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
250L, 4, "1#path/to/file2.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
250L, 4, "1#path/to/file3.json");
+
+ verify(gcsObjectMetadataFetcher,
times(2)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
+ anyBoolean());
+ verify(gcsObjectDataFetcher, times(2)).getCloudObjectDataDF(Mockito.any(),
+ eq(cloudObjectMetadataList), Mockito.any());
+ }
+
+ @Test
+ public void testTwoFilesAndContinueAcrossCommits() throws IOException {
+ String commitTimeForWrites = "2";
+ String commitTimeForReads = "1";
+
+ Pair<String, List<HoodieRecord>> inserts =
writeGcsMetadataRecords(commitTimeForWrites);
+ List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList(
+ new CloudObjectMetadata("data-file-1.json", 1),
+ new CloudObjectMetadata("data-file-2.json", 1));
+ when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(),
Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList);
+
+ List<Row> recs = Arrays.asList(
+ new GenericRow(new String[] {"1", "Hello 1"}),
+ new GenericRow(new String[] {"2", "Hello 2"}),
+ new GenericRow(new String[] {"3", "Hello 3"}),
+ new GenericRow(new String[] {"4", "Hello 4"})
+ );
+ StructType schema = new StructType(new StructField[] {
+ DataTypes.createStructField("id", DataTypes.StringType, true),
+ DataTypes.createStructField("text", DataTypes.StringType, true)
+ });
+ Dataset<Row> rows = spark().createDataFrame(recs, schema);
+
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "2"));
+
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(),
eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows));
+ when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, 4, "1#path/to/file1.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"),
100L, 4, "1#path/to/file2.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
1000L, 4, "2#path/to/file5.json");
+
+ verify(gcsObjectMetadataFetcher,
times(3)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
+ anyBoolean());
+ verify(gcsObjectDataFetcher, times(3)).getCloudObjectDataDF(Mockito.any(),
+ eq(cloudObjectMetadataList), Mockito.any());
+ }
+
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
- Option<String> checkpointToPull, int
expectedCount, String expectedCheckpoint) {
+ Option<String> checkpointToPull, long
sourceLimit, int expectedCount, String expectedCheckpoint) {
TypedProperties typedProperties = setProps(missingCheckpointStrategy);
GcsEventsHoodieIncrSource incrSource = new
GcsEventsHoodieIncrSource(typedProperties, jsc(),
- spark(), schemaProvider, gcsObjectMetadataFetcher,
gcsObjectDataFetcher);
+ spark(), schemaProvider, gcsObjectMetadataFetcher,
gcsObjectDataFetcher, queryRunner);
- Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
incrSource.fetchNextBatch(checkpointToPull, 100);
+ Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
Option<Dataset<Row>> datasetOpt = dataAndCheckpoint.getLeft();
String nextCheckPoint = dataAndCheckpoint.getRight();
@@ -174,7 +286,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
assertEquals(datasetOpt.get().count(), expectedCount);
}
- Assertions.assertEquals(nextCheckPoint, expectedCheckpoint);
+ Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
}
private HoodieRecord getGcsMetadataRecord(String commitTime, String
filename, String bucketName, String generation) {
@@ -260,4 +372,31 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.forTable(metaClient.getTableConfig().getTableName());
}
+
+ private String generateGCSEventMetadata(Long objectSize, String bucketName,
String objectKey, String commitTime)
+ throws JsonProcessingException {
+ Map<String, Object> objectMetadata = new HashMap<>();
+ objectMetadata.put("bucket", bucketName);
+ objectMetadata.put("name", objectKey);
+ objectMetadata.put("size", objectSize);
+ objectMetadata.put("_hoodie_commit_time", commitTime);
+ return mapper.writeValueAsString(objectMetadata);
+ }
+
+ private List<String> getSampleGCSObjectKeys(List<Triple<String, Long,
String>> filePathSizeAndCommitTime) {
+ return filePathSizeAndCommitTime.stream().map(f -> {
+ try {
+ return generateGCSEventMetadata(f.getMiddle(), "bucket-1",
f.getLeft(), f.getRight());
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private Dataset<Row> generateDataset(List<Triple<String, Long, String>>
filePathSizeAndCommitTime) {
+ JavaRDD<String> testRdd =
jsc.parallelize(getSampleGCSObjectKeys(filePathSizeAndCommitTime), 2);
+ Dataset<Row> inputDs = spark().read().json(testRdd);
+ return inputDs;
+ }
+
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
new file mode 100644
index 00000000000..8bd345626e7
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarness {
+ private static final Schema S3_METADATA_SCHEMA =
SchemaTestUtil.getSchemaFromResource(
+ TestS3EventsHoodieIncrSource.class, "/streamer-config/s3-metadata.avsc",
true);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private static final String MY_BUCKET = "some-bucket";
+
+ @Mock
+ private SchemaProvider mockSchemaProvider;
+ @Mock
+ QueryRunner mockQueryRunner;
+ @Mock
+ CloudDataFetcher mockCloudDataFetcher;
+ private JavaSparkContext jsc;
+ private HoodieTableMetaClient metaClient;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+ metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+ }
+
+ private List<String> getSampleS3ObjectKeys(List<Triple<String, Long,
String>> filePathSizeAndCommitTime) {
+ return filePathSizeAndCommitTime.stream().map(f -> {
+ try {
+ return generateS3EventMetadata(f.getMiddle(), MY_BUCKET, f.getLeft(),
f.getRight());
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private Dataset<Row> generateDataset(List<Triple<String, Long, String>>
filePathSizeAndCommitTime) {
+ JavaRDD<String> testRdd =
jsc.parallelize(getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2);
+ Dataset<Row> inputDs = spark().read().json(testRdd);
+ return inputDs;
+ }
+
+ /**
+ * Generates simple Json structure like below
+ * <p>
+ * s3 : {
+ * object : {
+ * size:
+ * key:
+ * }
+ * bucket: {
+ * name:
+ * }
+ */
+ private String generateS3EventMetadata(Long objectSize, String bucketName,
String objectKey, String commitTime)
+ throws JsonProcessingException {
+ Map<String, Object> objectMetadata = new HashMap<>();
+ objectMetadata.put("size", objectSize);
+ objectMetadata.put("key", objectKey);
+ Map<String, String> bucketMetadata = new HashMap<>();
+ bucketMetadata.put("name", bucketName);
+ Map<String, Object> s3Metadata = new HashMap<>();
+ s3Metadata.put("object", objectMetadata);
+ s3Metadata.put("bucket", bucketMetadata);
+ Map<String, Object> eventMetadata = new HashMap<>();
+ eventMetadata.put("s3", s3Metadata);
+ eventMetadata.put("_hoodie_commit_time", commitTime);
+ return mapper.writeValueAsString(eventMetadata);
+ }
+
+ private HoodieRecord generateS3EventMetadata(String commitTime, String
bucketName, String objectKey, Long objectSize) {
+ String partitionPath = bucketName;
+ Schema schema = S3_METADATA_SCHEMA;
+ GenericRecord rec = new GenericData.Record(schema);
+ Schema.Field s3Field = schema.getField("s3");
+ Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the
record schema is the second type
+ // Create a generic record for the "s3" field
+ GenericRecord s3Record = new GenericData.Record(s3Schema);
+
+ Schema.Field s3BucketField = s3Schema.getField("bucket");
+ Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming
the record schema is the second type
+ GenericRecord s3BucketRec = new GenericData.Record(s3Bucket);
+ s3BucketRec.put("name", bucketName);
+
+
+ Schema.Field s3ObjectField = s3Schema.getField("object");
+ Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming
the record schema is the second type
+ GenericRecord s3ObjectRec = new GenericData.Record(s3Object);
+ s3ObjectRec.put("key", objectKey);
+ s3ObjectRec.put("size", objectSize);
+
+ s3Record.put("bucket", s3BucketRec);
+ s3Record.put("object", s3ObjectRec);
+ rec.put("s3", s3Record);
+ rec.put("_hoodie_commit_time", commitTime);
+
+ HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec));
+ return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath),
payload);
+ }
+
+ private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy) {
+ Properties properties = new Properties();
+ properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path",
basePath());
+
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
+ missingCheckpointStrategy.name());
+
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.file.format",
"json");
+ return new TypedProperties(properties);
+ }
+
+ private HoodieWriteConfig.Builder getConfigBuilder(String basePath,
HoodieTableMetaClient metaClient) {
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withSchema(S3_METADATA_SCHEMA.toString())
+ .withParallelism(2, 2)
+ .withBulkInsertParallelism(2)
+ .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+ .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+ .forTable(metaClient.getTableConfig().getTableName());
+ }
+
+ private HoodieWriteConfig getWriteConfig() {
+ return getConfigBuilder(basePath(), metaClient)
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2,
3).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .build();
+ }
+
+ private Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String
commitTime) throws IOException {
+ HoodieWriteConfig writeConfig = getWriteConfig();
+ SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
+
+ writeClient.startCommitWithTime(commitTime);
+ List<HoodieRecord> s3MetadataRecords = Arrays.asList(
+ generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L)
+ );
+ JavaRDD<WriteStatus> result =
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
+
+ List<WriteStatus> statuses = result.collect();
+ assertNoWriteErrors(statuses);
+
+ return Pair.of(commitTime, s3MetadataRecords);
+ }
+
+ @Test
+ public void testEmptyCheckpoint() throws IOException {
+ String commitTimeForWrites = "1";
+ String commitTimeForReads = commitTimeForWrites;
+
+ Pair<String, List<HoodieRecord>> inserts =
writeS3MetadataRecords(commitTimeForWrites);
+
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 0L,
inserts.getKey());
+ }
+
+ @Test
+ public void testOneFileInCommit() throws IOException {
+ String commitTimeForWrites1 = "2";
+ String commitTimeForReads = "1";
+
+ Pair<String, List<HoodieRecord>> inserts =
writeS3MetadataRecords(commitTimeForReads);
+ inserts = writeS3MetadataRecords(commitTimeForWrites1);
+
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+ when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any()))
+ .thenReturn(Option.empty());
+
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"),
200L, "1#path/to/file2.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
200L, "1#path/to/file3.json");
+ }
+
+ @Test
+ public void testTwoFilesAndContinueInSameCommit() throws IOException {
+ String commitTimeForWrites = "2";
+ String commitTimeForReads = "1";
+
+ Pair<String, List<HoodieRecord>> inserts =
writeS3MetadataRecords(commitTimeForReads);
+ inserts = writeS3MetadataRecords(commitTimeForWrites);
+
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+ when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any()))
+ .thenReturn(Option.empty());
+
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
250L, "1#path/to/file2.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
250L, "1#path/to/file3.json");
+
+ }
+
+ @Test
+ public void testTwoFilesAndContinueAcrossCommits() throws IOException {
+ String commitTimeForWrites = "2";
+ String commitTimeForReads = "1";
+
+ Pair<String, List<HoodieRecord>> inserts =
writeS3MetadataRecords(commitTimeForReads);
+ inserts = writeS3MetadataRecords(commitTimeForWrites);
+
+
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "2"));
+
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+ when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any()))
+ .thenReturn(Option.empty());
+
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L,
"1#path/to/file1.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"),
100L, "1#path/to/file2.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
1000L, "2#path/to/file5.json");
+ }
+
+ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
+ Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint) {
+ TypedProperties typedProperties = setProps(missingCheckpointStrategy);
+
+ S3EventsHoodieIncrSource incrSource = new
S3EventsHoodieIncrSource(typedProperties, jsc(),
+ spark(), mockSchemaProvider, mockQueryRunner, mockCloudDataFetcher);
+
+ Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
+
+ Option<Dataset<Row>> datasetOpt = dataAndCheckpoint.getLeft();
+ String nextCheckPoint = dataAndCheckpoint.getRight();
+
+ Assertions.assertNotNull(nextCheckPoint);
+ Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
new file mode 100644
index 00000000000..3c0b5ee23c8
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.INIT_INSTANT_TS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestIncrSourceHelper extends SparkClientFunctionalTestHarness {
+
+ private ObjectMapper mapper = new ObjectMapper();
+ private JavaSparkContext jsc;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+ }
+
+ private String generateS3EventMetadata(Long objectSize, String bucketName,
String objectKey, String commitTime)
+ throws JsonProcessingException {
+ Map<String, Object> objectMetadata = new HashMap<>();
+ objectMetadata.put("size", objectSize);
+ objectMetadata.put("key", objectKey);
+ Map<String, String> bucketMetadata = new HashMap<>();
+ bucketMetadata.put("name", bucketName);
+ Map<String, Object> s3Metadata = new HashMap<>();
+ s3Metadata.put("object", objectMetadata);
+ s3Metadata.put("bucket", bucketMetadata);
+ Map<String, Object> eventMetadata = new HashMap<>();
+ eventMetadata.put("s3", s3Metadata);
+ eventMetadata.put("_hoodie_commit_time", commitTime);
+ return mapper.writeValueAsString(eventMetadata);
+ }
+
+ private List<String> getSampleS3ObjectKeys(List<Triple<String, Long,
String>> filePathSizeAndCommitTime) {
+ return filePathSizeAndCommitTime.stream().map(f -> {
+ try {
+ return generateS3EventMetadata(f.getMiddle(), "bucket-1", f.getLeft(),
f.getRight());
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private Dataset<Row> generateDataset(List<Triple<String, Long, String>>
filePathSizeAndCommitTime) {
+ JavaRDD<String> testRdd =
jsc.parallelize(getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2);
+ Dataset<Row> inputDs = spark().read().json(testRdd);
+ return inputDs;
+ }
+
+ @Test
+ void testEmptySource() {
+ StructType schema = new StructType();
+ Dataset<Row> emptyDataset = spark().createDataFrame(new ArrayList<Row>(),
schema);
+ QueryInfo queryInfo = new QueryInfo(
+ QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+ "commit2", "_hoodie_commit_time",
+ "s3.object.key", "s3.object.size");
+ Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result =
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ emptyDataset, 50L, queryInfo, new CloudObjectIncrCheckpoint(null,
null));
+ assertEquals(INIT_INSTANT_TS, result.getKey().toString());
+ assertEquals(emptyDataset, result.getRight());
+ }
+
+ @Test
+ void testSingleObjectExceedingSourceLimit() {
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L,
"commit2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L,
"commit2"));
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ QueryInfo queryInfo = new QueryInfo(
+ QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+ "commit2", "_hoodie_commit_time",
+ "s3.object.key", "s3.object.size");
+ Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result =
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ inputDs, 50L, queryInfo, new CloudObjectIncrCheckpoint("commit1",
null));
+ Row row =
result.getRight().select("cumulativeSize").collectAsList().get((int)
result.getRight().count() - 1);
+ assertEquals("commit1#path/to/file1.json", result.getKey().toString());
+ List<Row> rows = result.getRight().collectAsList();
+ assertEquals(1, rows.size());
+ assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100]]",
rows.toString());
+ assertEquals(100L, row.get(0));
+ }
+
+ @Test
+ void testMultipleObjectExceedingSourceLimit() {
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L,
"commit2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L,
"commit2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 100L,
"commit3"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 250L,
"commit3"));
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ QueryInfo queryInfo = new QueryInfo(
+ QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+ "commit2", "_hoodie_commit_time",
+ "s3.object.key", "s3.object.size");
+ Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result =
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ inputDs, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit1",
null));
+ Row row =
result.getRight().select("cumulativeSize").collectAsList().get((int)
result.getRight().count() - 1);
+ assertEquals("commit1#path/to/file2.json", result.getKey().toString());
+ List<Row> rows = result.getRight().collectAsList();
+ assertEquals(2, rows.size());
+ assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100],
[commit1,[[bucket-1],[path/to/file2.json,150]],250]]", rows.toString());
+ assertEquals(250L, row.get(0));
+
+ result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ inputDs, 550L, queryInfo, new CloudObjectIncrCheckpoint("commit1",
null));
+ row = result.getRight().select("cumulativeSize").collectAsList().get((int)
result.getRight().count() - 1);
+ assertEquals("commit2#path/to/file4.json", result.getKey().toString());
+ rows = result.getRight().collectAsList();
+ assertEquals(4, rows.size());
+ assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100],
[commit1,[[bucket-1],[path/to/file2.json,150]],250],"
+ + " [commit1,[[bucket-1],[path/to/file3.json,200]],450],
[commit2,[[bucket-1],[path/to/file4.json,50]],500]]",
+ rows.toString());
+ assertEquals(500L, row.get(0));
+ }
+
+ @Test
+ void testCatchAllObjects() {
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L,
"commit2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L,
"commit2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 100L,
"commit3"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file6.json", 250L,
"commit3"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 50L,
"commit3"));
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ QueryInfo queryInfo = new QueryInfo(
+ QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+ "commit2", "_hoodie_commit_time",
+ "s3.object.key", "s3.object.size");
+ Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result =
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit1",
null));
+ Row row =
result.getRight().select("cumulativeSize").collectAsList().get((int)
result.getRight().count() - 1);
+ assertEquals("commit3#path/to/file8.json", result.getKey().toString());
+ List<Row> rows = result.getRight().collectAsList();
+ assertEquals(8, rows.size());
+ assertEquals(1050L, row.get(0));
+ }
+
+ @Test
+ void testFileOrderingAcrossCommits() {
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 100L,
"commit3"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file6.json", 250L,
"commit3"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 50L,
"commit3"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file0.json", 100L,
"commit4"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L,
"commit4"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L,
"commit4"));
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ QueryInfo queryInfo = new QueryInfo(
+ QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit3", "commit3",
+ "commit4", "_hoodie_commit_time",
+ "s3.object.key", "s3.object.size");
+ Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result =
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ inputDs, 50L, queryInfo, new
CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
+ Row row =
result.getRight().select("cumulativeSize").collectAsList().get((int)
result.getRight().count() - 1);
+ assertEquals("commit4#path/to/file0.json", result.getKey().toString());
+ List<Row> rows = result.getRight().collectAsList();
+ assertEquals(1, rows.size());
+ assertEquals(100L, row.get(0));
+
+ result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ inputDs, 350L, queryInfo, new
CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
+ row = result.getRight().select("cumulativeSize").collectAsList().get((int)
result.getRight().count() - 1);
+ assertEquals("commit4#path/to/file2.json", result.getKey().toString());
+ rows = result.getRight().collectAsList();
+ assertEquals(3, rows.size());
+ assertEquals(200L, row.get(0));
+ }
+
+ @Test
+ void testLastObjectInCommit() {
+ List<Triple<String, Long, String>> filePathSizeAndCommitTime = new
ArrayList<>();
+ // Add file paths and sizes to the list
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L,
"commit1"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L,
"commit2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L,
"commit2"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 100L,
"commit3"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file6.json", 250L,
"commit3"));
+ filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 50L,
"commit3"));
+ Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+ QueryInfo queryInfo = new QueryInfo(
+ QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+ "commit3", "_hoodie_commit_time",
+ "s3.object.key", "s3.object.size");
+ Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result =
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+ inputDs, 1500L, queryInfo, new
CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
+ assertEquals("commit3#path/to/file8.json", result.getKey().toString());
+ assertTrue(result.getRight().isEmpty());
+ }
+}
\ No newline at end of file