This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 0b57483 [HUDI-2015] Fix flink operator uid to allow multiple pipelines in one job (#3091) 0b57483 is described below commit 0b57483a8e41742689a1362aa94aabb94a1361b3 Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Thu Jun 17 09:08:19 2021 +0800 [HUDI-2015] Fix flink operator uid to allow multiple pipelines in one job (#3091) --- .../src/main/java/org/apache/hudi/table/HoodieTableSink.java | 8 ++++---- .../src/main/java/org/apache/hudi/table/HoodieTableSource.java | 8 ++------ 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 4fbcbb5..c478893 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -93,17 +93,17 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, "bucket_assigner", TypeInformation.of(HoodieRecord.class), new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner") + .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write") + .name("uid_hoodie_stream_write") .setParallelism(numWriteTasks); if (StreamerUtil.needsScheduleCompaction(conf)) { return pipeline.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), new CompactionPlanOperator(conf)) - .uid("uid_compact_plan_generate") + .name("uid_compact_plan_generate") .setParallelism(1) // plan generate must be singleton .keyBy(event -> event.getOperation().hashCode()) .transform("compact_task", @@ -116,7 +116,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, } else { return pipeline.addSink(new CleanFunction<>(conf)) .setParallelism(1) - .name("clean_commits").uid("uid_clean_commits"); + .name("clean_commits"); } }; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 50420f9..55ec46a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -181,17 +181,13 @@ public class HoodieTableSource implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source") .setParallelism(1) - .uid("uid_streaming_source") .transform("split_reader", typeInfo, factory) - .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) - .uid("uid_split_reader"); + .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); } else { InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo); - return source.name("bounded_source") - .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) - .uid("uid_bounded_source"); + return source.name("bounded_source").setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); } } };