This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 92eab8d6868 [HUDI-7816] Provide SourceProfileSupplier option into the 
SnapshotLoadQuerySplitter (#11368)
92eab8d6868 is described below

commit 92eab8d68685db4bc8b7764ff8fd290da8f4ac81
Author: Matthew Wong <[email protected]>
AuthorDate: Sat Jun 1 22:18:35 2024 -0700

    [HUDI-7816] Provide SourceProfileSupplier option into the 
SnapshotLoadQuerySplitter (#11368)
---
 .../apache/hudi/utilities/sources/HoodieIncrSource.java  |  2 +-
 .../utilities/sources/SnapshotLoadQuerySplitter.java     | 16 ++++++++++++----
 .../hudi/utilities/sources/helpers/QueryRunner.java      |  2 +-
 .../sources/helpers/TestSnapshotQuerySplitterImpl.java   |  3 ++-
 4 files changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 2480cbd650f..6c2eede1ba9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -221,7 +221,7 @@ public class HoodieIncrSource extends RowSource {
           .option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
           .load(srcPath);
       if (snapshotLoadQuerySplitter.isPresent()) {
-        queryInfo = 
snapshotLoadQuerySplitter.get().getNextCheckpoint(snapshot, queryInfo);
+        queryInfo = 
snapshotLoadQuerySplitter.get().getNextCheckpoint(snapshot, queryInfo, 
sourceProfileSupplier);
       }
       source = snapshot
           // add filtering so that only interested records are returned.
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
index ca299122ec7..f0fd1fed904 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
@@ -18,10 +18,14 @@
 
 package org.apache.hudi.utilities.sources;
 
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -30,6 +34,7 @@ import static 
org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config
 /**
  * Abstract splitter responsible for managing the snapshot load query 
operations.
  */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
 public abstract class SnapshotLoadQuerySplitter {
 
   /**
@@ -61,20 +66,23 @@ public abstract class SnapshotLoadQuerySplitter {
    *
    * @param df The dataset to process.
    * @param beginCheckpointStr The starting checkpoint string.
+   * @param sourceProfileSupplier An Option of a SourceProfileSupplier to use 
in load splitting implementation
    * @return The next checkpoint as an Option.
    */
-  public abstract Option<String> getNextCheckpoint(Dataset<Row> df, String 
beginCheckpointStr);
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract Option<String> getNextCheckpoint(Dataset<Row> df, String 
beginCheckpointStr, Option<SourceProfileSupplier> sourceProfileSupplier);
 
   /**
-   * Retrieves the next checkpoint based on query information.
+   * Retrieves the next checkpoint based on query information and a 
SourceProfileSupplier.
    *
    * @param df The dataset to process.
    * @param queryInfo The query information object.
+   * @param sourceProfileSupplier An Option of a SourceProfileSupplier to use 
in load splitting implementation
    * @return Updated query information with the next checkpoint, in case of 
empty checkpoint,
    * returning endPoint same as queryInfo.getEndInstant().
    */
-  public QueryInfo getNextCheckpoint(Dataset<Row> df, QueryInfo queryInfo) {
-    return getNextCheckpoint(df, queryInfo.getStartInstant())
+  public QueryInfo getNextCheckpoint(Dataset<Row> df, QueryInfo queryInfo, 
Option<SourceProfileSupplier> sourceProfileSupplier) {
+    return getNextCheckpoint(df, queryInfo.getStartInstant(), 
sourceProfileSupplier)
         .map(checkpoint -> queryInfo.withUpdatedEndInstant(checkpoint))
         .orElse(queryInfo);
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
index 2f0a8bf488e..e2571bd54f8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
@@ -100,7 +100,7 @@ public class QueryRunner {
     Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi")
         .option(DataSourceReadOptions.QUERY_TYPE().key(), 
queryInfo.getQueryType()).load(sourcePath);
     QueryInfo snapshotQueryInfo = snapshotLoadQuerySplitterOption
-        .map(snapshotLoadQuerySplitter -> 
snapshotLoadQuerySplitter.getNextCheckpoint(snapshot, queryInfo))
+        .map(snapshotLoadQuerySplitter -> 
snapshotLoadQuerySplitter.getNextCheckpoint(snapshot, queryInfo, 
Option.empty()))
         .orElse(queryInfo);
     return Pair.of(snapshotQueryInfo, applySnapshotQueryFilters(snapshot, 
snapshotQueryInfo));
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java
index 4ba79e8978a..16c07d15f3a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import java.util.List;
@@ -43,7 +44,7 @@ public class TestSnapshotQuerySplitterImpl extends 
SnapshotLoadQuerySplitter {
   }
 
   @Override
-  public Option<String> getNextCheckpoint(Dataset<Row> df, String 
beginCheckpointStr) {
+  public Option<String> getNextCheckpoint(Dataset<Row> df, String 
beginCheckpointStr, Option<SourceProfileSupplier> sourceProfileSupplierOption) {
     List<Row> row = 
df.filter(col(COMMIT_TIME_METADATA_FIELD).gt(lit(beginCheckpointStr)))
         .orderBy(col(COMMIT_TIME_METADATA_FIELD)).limit(1).collectAsList();
     return Option.ofNullable(row.size() > 0 ? 
row.get(0).getAs(COMMIT_TIME_METADATA_FIELD) : null);

Reply via email to