This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f06ff5b3e0e [HUDI-7090] Set the maxParallelism for singleton operator
(#10090)
f06ff5b3e0e is described below
commit f06ff5b3e0ee8bb6e49aad04d3b6054d6c46e272
Author: hehuiyuan <[email protected]>
AuthorDate: Fri Nov 17 09:43:21 2023 +0800
[HUDI-7090] Set the maxParallelism for singleton operator (#10090)
---
.../hudi/sink/clustering/HoodieFlinkClusteringJob.java | 4 +++-
.../org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 4 +++-
.../main/java/org/apache/hudi/sink/utils/Pipelines.java | 14 +++++++++++---
.../main/java/org/apache/hudi/table/HoodieTableSource.java | 1 +
4 files changed, 18 insertions(+), 5 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index a453cac6803..0966f6995bd 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -348,7 +348,9 @@ public class HoodieFlinkClusteringJob {
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.uid("uid_clustering_commit")
- .setParallelism(1);
+ .setParallelism(1)
+ .getTransformation()
+ .setMaxParallelism(1);
env.execute("flink_hudi_clustering_" + clusteringInstant.getTimestamp());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 57e823ab21c..99dd45d94b4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -298,7 +298,9 @@ public class HoodieFlinkCompactor {
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
.uid("uid_compaction_commit")
- .setParallelism(1);
+ .setParallelism(1)
+ .getTransformation()
+ .setMaxParallelism(1);
env.execute("flink_hudi_compaction_" + String.join(",",
compactionInstantTimes));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index e66009aa551..b3acd4cfa11 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -410,10 +410,11 @@ public class Pipelines {
* @return the compaction pipeline
*/
public static DataStreamSink<CompactionCommitEvent> compact(Configuration
conf, DataStream<Object> dataStream) {
- return dataStream.transform("compact_plan_generate",
+ DataStreamSink<CompactionCommitEvent> compactionCommitEventDataStream =
dataStream.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
+ .setMaxParallelism(1)
// make the distribution strategy deterministic to avoid concurrent
modifications
// on the same bucket files
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
@@ -424,6 +425,8 @@ public class Pipelines {
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
.setParallelism(1); // compaction commit should be singleton
+ compactionCommitEventDataStream.getTransformation().setMaxParallelism(1);
+ return compactionCommitEventDataStream;
}
/**
@@ -452,6 +455,7 @@ public class Pipelines {
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
+ .setMaxParallelism(1) // plan generate must be singleton
.keyBy(plan ->
// make the distribution strategy deterministic to avoid
concurrent modifications
// on the same bucket files
@@ -465,15 +469,19 @@ public class Pipelines {
ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
- return clusteringStream.addSink(new ClusteringCommitSink(conf))
+ DataStreamSink<ClusteringCommitEvent> clusteringCommitEventDataStream =
clusteringStream.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.setParallelism(1); // clustering commit should be singleton
+ clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1);
+ return clusteringCommitEventDataStream;
}
public static DataStreamSink<Object> clean(Configuration conf,
DataStream<Object> dataStream) {
- return dataStream.addSink(new CleanFunction<>(conf))
+ DataStreamSink<Object> cleanCommitDataStream = dataStream.addSink(new
CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
+ cleanCommitDataStream.getTransformation().setMaxParallelism(1);
+ return cleanCommitDataStream;
}
public static DataStreamSink<Object> dummySink(DataStream<Object>
dataStream) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index e4b8db33516..b4ef68a3939 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -207,6 +207,7 @@ public class HoodieTableSource implements
SingleOutputStreamOperator<RowData> source =
execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
.uid(Pipelines.opUID("split_monitor", conf))
.setParallelism(1)
+ .setMaxParallelism(1)
.keyBy(MergeOnReadInputSplit::getFileId)
.transform("split_reader", typeInfo, factory)
.uid(Pipelines.opUID("split_reader", conf))