This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 92eab8d6868 [HUDI-7816] Provide SourceProfileSupplier option into the
SnapshotLoadQuerySplitter (#11368)
92eab8d6868 is described below
commit 92eab8d68685db4bc8b7764ff8fd290da8f4ac81
Author: Matthew Wong <[email protected]>
AuthorDate: Sat Jun 1 22:18:35 2024 -0700
[HUDI-7816] Provide SourceProfileSupplier option into the
SnapshotLoadQuerySplitter (#11368)
---
.../apache/hudi/utilities/sources/HoodieIncrSource.java | 2 +-
.../utilities/sources/SnapshotLoadQuerySplitter.java | 16 ++++++++++++----
.../hudi/utilities/sources/helpers/QueryRunner.java | 2 +-
.../sources/helpers/TestSnapshotQuerySplitterImpl.java | 3 ++-
4 files changed, 16 insertions(+), 7 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 2480cbd650f..6c2eede1ba9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -221,7 +221,7 @@ public class HoodieIncrSource extends RowSource {
.option(DataSourceReadOptions.QUERY_TYPE().key(),
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
.load(srcPath);
if (snapshotLoadQuerySplitter.isPresent()) {
- queryInfo =
snapshotLoadQuerySplitter.get().getNextCheckpoint(snapshot, queryInfo);
+ queryInfo =
snapshotLoadQuerySplitter.get().getNextCheckpoint(snapshot, queryInfo,
sourceProfileSupplier);
}
source = snapshot
// add filtering so that only interested records are returned.
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
index ca299122ec7..f0fd1fed904 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
@@ -18,10 +18,14 @@
package org.apache.hudi.utilities.sources;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -30,6 +34,7 @@ import static
org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config
/**
* Abstract splitter responsible for managing the snapshot load query
operations.
*/
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public abstract class SnapshotLoadQuerySplitter {
/**
@@ -61,20 +66,23 @@ public abstract class SnapshotLoadQuerySplitter {
*
* @param df The dataset to process.
* @param beginCheckpointStr The starting checkpoint string.
+ * @param sourceProfileSupplier An Option of a SourceProfileSupplier to use
in load splitting implementation
* @return The next checkpoint as an Option.
*/
- public abstract Option<String> getNextCheckpoint(Dataset<Row> df, String
beginCheckpointStr);
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract Option<String> getNextCheckpoint(Dataset<Row> df, String
beginCheckpointStr, Option<SourceProfileSupplier> sourceProfileSupplier);
/**
- * Retrieves the next checkpoint based on query information.
+ * Retrieves the next checkpoint based on query information and a
SourceProfileSupplier.
*
* @param df The dataset to process.
* @param queryInfo The query information object.
+ * @param sourceProfileSupplier An Option of a SourceProfileSupplier to use
in load splitting implementation
* @return Updated query information with the next checkpoint, in case of
empty checkpoint,
* returning endPoint same as queryInfo.getEndInstant().
*/
- public QueryInfo getNextCheckpoint(Dataset<Row> df, QueryInfo queryInfo) {
- return getNextCheckpoint(df, queryInfo.getStartInstant())
+ public QueryInfo getNextCheckpoint(Dataset<Row> df, QueryInfo queryInfo,
Option<SourceProfileSupplier> sourceProfileSupplier) {
+ return getNextCheckpoint(df, queryInfo.getStartInstant(),
sourceProfileSupplier)
.map(checkpoint -> queryInfo.withUpdatedEndInstant(checkpoint))
.orElse(queryInfo);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
index 2f0a8bf488e..e2571bd54f8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
@@ -100,7 +100,7 @@ public class QueryRunner {
Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(),
queryInfo.getQueryType()).load(sourcePath);
QueryInfo snapshotQueryInfo = snapshotLoadQuerySplitterOption
- .map(snapshotLoadQuerySplitter ->
snapshotLoadQuerySplitter.getNextCheckpoint(snapshot, queryInfo))
+ .map(snapshotLoadQuerySplitter ->
snapshotLoadQuerySplitter.getNextCheckpoint(snapshot, queryInfo,
Option.empty()))
.orElse(queryInfo);
return Pair.of(snapshotQueryInfo, applySnapshotQueryFilters(snapshot,
snapshotQueryInfo));
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java
index 4ba79e8978a..16c07d15f3a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.List;
@@ -43,7 +44,7 @@ public class TestSnapshotQuerySplitterImpl extends
SnapshotLoadQuerySplitter {
}
@Override
- public Option<String> getNextCheckpoint(Dataset<Row> df, String
beginCheckpointStr) {
+ public Option<String> getNextCheckpoint(Dataset<Row> df, String
beginCheckpointStr, Option<SourceProfileSupplier> sourceProfileSupplierOption) {
List<Row> row =
df.filter(col(COMMIT_TIME_METADATA_FIELD).gt(lit(beginCheckpointStr)))
.orderBy(col(COMMIT_TIME_METADATA_FIELD)).limit(1).collectAsList();
return Option.ofNullable(row.size() > 0 ?
row.get(0).getAs(COMMIT_TIME_METADATA_FIELD) : null);