This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d4aba1a6f7f [HUDI-8068] Hook up source partitions to s3 incr source 
(#11752)
d4aba1a6f7f is described below

commit d4aba1a6f7f8c4b782b8c33e688f1b3c0816a9d5
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Fri Sep 20 12:56:27 2024 -0700

    [HUDI-8068] Hook up source partitions to s3 incr source (#11752)
    
    Co-authored-by: rmahindra123 <[email protected]>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../sources/helpers/CloudDataFetcher.java          | 13 +++++++++--
 .../sources/TestGcsEventsHoodieIncrSource.java     | 20 +++++++++--------
 .../sources/TestS3EventsHoodieIncrSource.java      | 25 ++++++++++++----------
 3 files changed, 36 insertions(+), 22 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index 7fd656adb7e..5f14e73a0fd 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -117,18 +117,23 @@ public class CloudDataFetcher implements Serializable {
 
     long bytesPerPartition = 
props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ? 
props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) :
         props.getLong(PARQUET_MAX_FILE_SIZE.key(), 
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
+    int numSourcePartitions = 0;
     if (isSourceProfileSupplierAvailable) {
       long bytesPerPartitionFromProfile = (long) 
sourceProfileSupplier.get().getSourceProfile().getSourceSpecificContext();
       if (bytesPerPartitionFromProfile > 0) {
         LOG.debug("Using bytesPerPartition from source profile 
bytesPerPartitionFromConfig {} bytesPerPartitionFromProfile {}", 
bytesPerPartition, bytesPerPartitionFromProfile);
         bytesPerPartition = bytesPerPartitionFromProfile;
       }
+      numSourcePartitions = 
sourceProfileSupplier.get().getSourceProfile().getSourcePartitions();
     }
-    Option<Dataset<Row>> datasetOption = 
getCloudObjectDataDF(cloudObjectMetadata, schemaProvider, bytesPerPartition);
+    Option<Dataset<Row>> datasetOption = 
getCloudObjectDataDF(cloudObjectMetadata, schemaProvider, bytesPerPartition, 
numSourcePartitions);
     return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
   }
 
-  private Option<Dataset<Row>> getCloudObjectDataDF(List<CloudObjectMetadata> 
cloudObjectMetadata, Option<SchemaProvider> schemaProviderOption, long 
bytesPerPartition) {
+  private Option<Dataset<Row>> getCloudObjectDataDF(List<CloudObjectMetadata> 
cloudObjectMetadata,
+                                                    Option<SchemaProvider> 
schemaProviderOption,
+                                                    long bytesPerPartition,
+                                                    int numSourcePartitions) {
     long totalSize = 0;
     for (CloudObjectMetadata o : cloudObjectMetadata) {
       totalSize += o.getSize();
@@ -137,6 +142,10 @@ public class CloudDataFetcher implements Serializable {
     double totalSizeWithHoodieMetaFields = totalSize * 1.1;
     metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(totalSize);
     int numPartitions = (int) Math.max(Math.ceil(totalSizeWithHoodieMetaFields 
/ bytesPerPartition), 1);
+    // If the number of source partitions is configured to be greater, then 
use it instead.
+    if (numPartitions < numSourcePartitions) {
+      numPartitions = numSourcePartitions;
+    }
     metrics.updateStreamerSourceParallelism(numPartitions);
     return cloudObjectsSelectorCommon.loadAsDataset(sparkSession, 
cloudObjectMetadata, getFileFormat(props), schemaProviderOption, numPartitions);
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 41ab16d7bfd..67149d1e57f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -238,11 +238,11 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     List<Long> bytesPerPartition = Arrays.asList(10L, 100L, -1L);
     setMockQueryRunner(inputDs);
 
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(100L, bytesPerPartition.get(0)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(100L, 0, bytesPerPartition.get(0)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, 
"1#path/to/file1" + extension, typedProperties);
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(100L, bytesPerPartition.get(1)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(100L, 0, bytesPerPartition.get(1)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + 
extension), 100L, "1#path/to/file2" + extension, typedProperties);
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(1000L, bytesPerPartition.get(2)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(1000L, 0, bytesPerPartition.get(2)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + 
extension), 1000L, "2#path/to/file5" + extension, typedProperties);
     // Verify the partitions being passed in getCloudObjectDataDF are correct.
     List<Integer> numPartitions = Arrays.asList(12, 2, 1);
@@ -280,18 +280,20 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
     List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000L * 1000L 
* 1000L);
 
+    // If the computed number of partitions based on bytes is less than this 
value, it should use this value for num partitions.
+    int sourcePartitions = 2;
     //1. snapshot query, read all records
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50000L, bytesPerPartition.get(0)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50000L, sourcePartitions, bytesPerPartition.get(0)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, 
typedProperties);
     //2. incremental query, as commit is present in timeline
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(10L, bytesPerPartition.get(1)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(10L, sourcePartitions, bytesPerPartition.get(1)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, 
exptected2, typedProperties);
     //3. snapshot query with source limit less than first commit size
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, bytesPerPartition.get(2)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(2)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, 
typedProperties);
     
typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix",
 "path/to");
     //4. As snapshotQuery will return 1 -> same would be return as 
nextCheckpoint (dataset is empty due to ignore prefix).
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, bytesPerPartition.get(3)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(3)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, 
typedProperties);
     // Verify the partitions being passed in getCloudObjectDataDF are correct.
     ArgumentCaptor<Integer> argumentCaptor = 
ArgumentCaptor.forClass(Integer.class);
@@ -300,9 +302,9 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     verify(metrics, 
atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture());
     List<Integer> numPartitions;
     if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
-      numPartitions = Arrays.asList(12, 3, 1);
+      numPartitions = Arrays.asList(12, 3, sourcePartitions);
     } else {
-      numPartitions = Arrays.asList(23, 1);
+      numPartitions = Arrays.asList(23, sourcePartitions);
     }
     Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues());
     Assertions.assertEquals(numPartitions, 
argumentCaptorForMetrics.getAllValues());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 2a011cd9812..1b1c43da158 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -399,7 +399,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs);
-    SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 10L);
+    SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 0, 10L);
     when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), 
Mockito.anyInt())).thenReturn(Option.empty());
     if (useSourceProfile) {
       when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile);
@@ -438,7 +438,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     setMockQueryRunner(inputDs);
     when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), 
Mockito.anyInt())).thenReturn(Option.empty());
-    SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 10L);
+    SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 0, 10L);
     if (useSourceProfile) {
       when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile);
     } else {
@@ -484,18 +484,20 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to/skip");
     List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000L * 1000L 
* 1000L);
 
+    // If the computed number of partitions based on bytes is less than this 
value, it should use this value for num partitions.
+    int sourcePartitions = 2;
     //1. snapshot query, read all records
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50000L, bytesPerPartition.get(0)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50000L, sourcePartitions, bytesPerPartition.get(0)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, 
typedProperties);
     //2. incremental query, as commit is present in timeline
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(10L, bytesPerPartition.get(1)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(10L, sourcePartitions, bytesPerPartition.get(1)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, 
exptected2, typedProperties);
     //3. snapshot query with source limit less than first commit size
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, bytesPerPartition.get(2)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(2)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, 
typedProperties);
     
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to");
     //4. As snapshotQuery will return 1 -> same would be return as 
nextCheckpoint (dataset is empty due to ignore prefix).
-    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, bytesPerPartition.get(3)));
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(3)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, 
typedProperties);
     // Verify the partitions being passed in getCloudObjectDataDF are correct.
     ArgumentCaptor<Integer> argumentCaptor = 
ArgumentCaptor.forClass(Integer.class);
@@ -504,9 +506,9 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     verify(metrics, 
atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture());
     List<Integer> numPartitions;
     if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
-      numPartitions = Arrays.asList(12, 3, 1);
+      numPartitions = Arrays.asList(12, 3, sourcePartitions);
     } else {
-      numPartitions = Arrays.asList(23, 1);
+      numPartitions = Arrays.asList(23, sourcePartitions);
     }
     Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues());
     Assertions.assertEquals(numPartitions, 
argumentCaptorForMetrics.getAllValues());
@@ -566,12 +568,13 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
   }
 
   static class TestSourceProfile implements SourceProfile<Long> {
-
     private final long maxSourceBytes;
+    private final int sourcePartitions;
     private final long bytesPerPartition;
 
-    public TestSourceProfile(long maxSourceBytes, long bytesPerPartition) {
+    public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long 
bytesPerPartition) {
       this.maxSourceBytes = maxSourceBytes;
+      this.sourcePartitions = sourcePartitions;
       this.bytesPerPartition = bytesPerPartition;
     }
 
@@ -582,7 +585,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     @Override
     public int getSourcePartitions() {
-      throw new UnsupportedOperationException("getSourcePartitions is not 
required for S3 source profile");
+      return sourcePartitions;
     }
 
     @Override

Reply via email to