This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d4aba1a6f7f [HUDI-8068] Hook up source partitions to s3 incr source
(#11752)
d4aba1a6f7f is described below
commit d4aba1a6f7f8c4b782b8c33e688f1b3c0816a9d5
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Fri Sep 20 12:56:27 2024 -0700
[HUDI-8068] Hook up source partitions to s3 incr source (#11752)
Co-authored-by: rmahindra123 <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../sources/helpers/CloudDataFetcher.java | 13 +++++++++--
.../sources/TestGcsEventsHoodieIncrSource.java | 20 +++++++++--------
.../sources/TestS3EventsHoodieIncrSource.java | 25 ++++++++++++----------
3 files changed, 36 insertions(+), 22 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index 7fd656adb7e..5f14e73a0fd 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -117,18 +117,23 @@ public class CloudDataFetcher implements Serializable {
long bytesPerPartition =
props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ?
props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) :
props.getLong(PARQUET_MAX_FILE_SIZE.key(),
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
+ int numSourcePartitions = 0;
if (isSourceProfileSupplierAvailable) {
long bytesPerPartitionFromProfile = (long)
sourceProfileSupplier.get().getSourceProfile().getSourceSpecificContext();
if (bytesPerPartitionFromProfile > 0) {
LOG.debug("Using bytesPerPartition from source profile
bytesPerPartitionFromConfig {} bytesPerPartitionFromProfile {}",
bytesPerPartition, bytesPerPartitionFromProfile);
bytesPerPartition = bytesPerPartitionFromProfile;
}
+ numSourcePartitions =
sourceProfileSupplier.get().getSourceProfile().getSourcePartitions();
}
- Option<Dataset<Row>> datasetOption =
getCloudObjectDataDF(cloudObjectMetadata, schemaProvider, bytesPerPartition);
+ Option<Dataset<Row>> datasetOption =
getCloudObjectDataDF(cloudObjectMetadata, schemaProvider, bytesPerPartition,
numSourcePartitions);
return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
}
- private Option<Dataset<Row>> getCloudObjectDataDF(List<CloudObjectMetadata>
cloudObjectMetadata, Option<SchemaProvider> schemaProviderOption, long
bytesPerPartition) {
+ private Option<Dataset<Row>> getCloudObjectDataDF(List<CloudObjectMetadata>
cloudObjectMetadata,
+ Option<SchemaProvider>
schemaProviderOption,
+ long bytesPerPartition,
+ int numSourcePartitions) {
long totalSize = 0;
for (CloudObjectMetadata o : cloudObjectMetadata) {
totalSize += o.getSize();
@@ -137,6 +142,10 @@ public class CloudDataFetcher implements Serializable {
double totalSizeWithHoodieMetaFields = totalSize * 1.1;
metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(totalSize);
int numPartitions = (int) Math.max(Math.ceil(totalSizeWithHoodieMetaFields
/ bytesPerPartition), 1);
+ // If the number of source partitions is configured to be greater, then
use it instead.
+ if (numPartitions < numSourcePartitions) {
+ numPartitions = numSourcePartitions;
+ }
metrics.updateStreamerSourceParallelism(numPartitions);
return cloudObjectsSelectorCommon.loadAsDataset(sparkSession,
cloudObjectMetadata, getFileFormat(props), schemaProviderOption, numPartitions);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 41ab16d7bfd..67149d1e57f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -238,11 +238,11 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
List<Long> bytesPerPartition = Arrays.asList(10L, 100L, -1L);
setMockQueryRunner(inputDs);
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(100L, bytesPerPartition.get(0)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(100L, 0, bytesPerPartition.get(0)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L,
"1#path/to/file1" + extension, typedProperties);
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(100L, bytesPerPartition.get(1)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(100L, 0, bytesPerPartition.get(1)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" +
extension), 100L, "1#path/to/file2" + extension, typedProperties);
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(1000L, bytesPerPartition.get(2)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(1000L, 0, bytesPerPartition.get(2)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" +
extension), 1000L, "2#path/to/file5" + extension, typedProperties);
// Verify the partitions being passed in getCloudObjectDataDF are correct.
List<Integer> numPartitions = Arrays.asList(12, 2, 1);
@@ -280,18 +280,20 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000L * 1000L
* 1000L);
+ // If the computed number of partitions based on bytes is less than this
value, it should use this value for num partitions.
+ int sourcePartitions = 2;
//1. snapshot query, read all records
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50000L, bytesPerPartition.get(0)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50000L, sourcePartitions, bytesPerPartition.get(0)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1,
typedProperties);
//2. incremental query, as commit is present in timeline
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(10L, bytesPerPartition.get(1)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(10L, sourcePartitions, bytesPerPartition.get(1)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L,
exptected2, typedProperties);
//3. snapshot query with source limit less than first commit size
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, bytesPerPartition.get(2)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(2)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3,
typedProperties);
typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix",
"path/to");
//4. As snapshotQuery will return 1 -> same would be return as
nextCheckpoint (dataset is empty due to ignore prefix).
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, bytesPerPartition.get(3)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(3)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4,
typedProperties);
// Verify the partitions being passed in getCloudObjectDataDF are correct.
ArgumentCaptor<Integer> argumentCaptor =
ArgumentCaptor.forClass(Integer.class);
@@ -300,9 +302,9 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
verify(metrics,
atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture());
List<Integer> numPartitions;
if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
- numPartitions = Arrays.asList(12, 3, 1);
+ numPartitions = Arrays.asList(12, 3, sourcePartitions);
} else {
- numPartitions = Arrays.asList(23, 1);
+ numPartitions = Arrays.asList(23, sourcePartitions);
}
Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues());
Assertions.assertEquals(numPartitions,
argumentCaptorForMetrics.getAllValues());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 2a011cd9812..1b1c43da158 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -399,7 +399,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
setMockQueryRunner(inputDs);
- SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 10L);
+ SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 0, 10L);
when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider),
Mockito.anyInt())).thenReturn(Option.empty());
if (useSourceProfile) {
when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile);
@@ -438,7 +438,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
setMockQueryRunner(inputDs);
when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider),
Mockito.anyInt())).thenReturn(Option.empty());
- SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 10L);
+ SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 0, 10L);
if (useSourceProfile) {
when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile);
} else {
@@ -484,18 +484,20 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000L * 1000L
* 1000L);
+ // If the computed number of partitions based on bytes is less than this
value, it should use this value for num partitions.
+ int sourcePartitions = 2;
//1. snapshot query, read all records
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50000L, bytesPerPartition.get(0)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50000L, sourcePartitions, bytesPerPartition.get(0)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1,
typedProperties);
//2. incremental query, as commit is present in timeline
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(10L, bytesPerPartition.get(1)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(10L, sourcePartitions, bytesPerPartition.get(1)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L,
exptected2, typedProperties);
//3. snapshot query with source limit less than first commit size
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, bytesPerPartition.get(2)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(2)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3,
typedProperties);
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to");
//4. As snapshotQuery will return 1 -> same would be return as
nextCheckpoint (dataset is empty due to ignore prefix).
- when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, bytesPerPartition.get(3)));
+ when(sourceProfileSupplier.getSourceProfile()).thenReturn(new
TestSourceProfile(50L, sourcePartitions, bytesPerPartition.get(3)));
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4,
typedProperties);
// Verify the partitions being passed in getCloudObjectDataDF are correct.
ArgumentCaptor<Integer> argumentCaptor =
ArgumentCaptor.forClass(Integer.class);
@@ -504,9 +506,9 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
verify(metrics,
atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture());
List<Integer> numPartitions;
if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
- numPartitions = Arrays.asList(12, 3, 1);
+ numPartitions = Arrays.asList(12, 3, sourcePartitions);
} else {
- numPartitions = Arrays.asList(23, 1);
+ numPartitions = Arrays.asList(23, sourcePartitions);
}
Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues());
Assertions.assertEquals(numPartitions,
argumentCaptorForMetrics.getAllValues());
@@ -566,12 +568,13 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
}
static class TestSourceProfile implements SourceProfile<Long> {
-
private final long maxSourceBytes;
+ private final int sourcePartitions;
private final long bytesPerPartition;
- public TestSourceProfile(long maxSourceBytes, long bytesPerPartition) {
+ public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long
bytesPerPartition) {
this.maxSourceBytes = maxSourceBytes;
+ this.sourcePartitions = sourcePartitions;
this.bytesPerPartition = bytesPerPartition;
}
@@ -582,7 +585,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
@Override
public int getSourcePartitions() {
- throw new UnsupportedOperationException("getSourcePartitions is not
required for S3 source profile");
+ return sourcePartitions;
}
@Override