This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b57483  [HUDI-2015] Fix flink operator uid to allow multiple 
pipelines in one job (#3091)
0b57483 is described below

commit 0b57483a8e41742689a1362aa94aabb94a1361b3
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Thu Jun 17 09:08:19 2021 +0800

    [HUDI-2015] Fix flink operator uid to allow multiple pipelines in one job 
(#3091)
---
 .../src/main/java/org/apache/hudi/table/HoodieTableSink.java      | 8 ++++----
 .../src/main/java/org/apache/hudi/table/HoodieTableSource.java    | 8 ++------
 2 files changed, 6 insertions(+), 10 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 4fbcbb5..c478893 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -93,17 +93,17 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
               "bucket_assigner",
               TypeInformation.of(HoodieRecord.class),
               new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
-          .uid("uid_bucket_assigner")
+          .uid("uid_bucket_assigner_" + 
conf.getString(FlinkOptions.TABLE_NAME))
           // shuffle by fileId(bucket id)
           .keyBy(record -> record.getCurrentLocation().getFileId())
           .transform("hoodie_stream_write", TypeInformation.of(Object.class), 
operatorFactory)
-          .uid("uid_hoodie_stream_write")
+          .name("uid_hoodie_stream_write")
           .setParallelism(numWriteTasks);
       if (StreamerUtil.needsScheduleCompaction(conf)) {
         return pipeline.transform("compact_plan_generate",
             TypeInformation.of(CompactionPlanEvent.class),
             new CompactionPlanOperator(conf))
-            .uid("uid_compact_plan_generate")
+            .name("uid_compact_plan_generate")
             .setParallelism(1) // plan generate must be singleton
             .keyBy(event -> event.getOperation().hashCode())
             .transform("compact_task",
@@ -116,7 +116,7 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
       } else {
         return pipeline.addSink(new CleanFunction<>(conf))
             .setParallelism(1)
-            .name("clean_commits").uid("uid_clean_commits");
+            .name("clean_commits");
       }
     };
   }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 50420f9..55ec46a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -181,17 +181,13 @@ public class HoodieTableSource implements
           OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
           SingleOutputStreamOperator<RowData> source = 
execEnv.addSource(monitoringFunction, "streaming_source")
               .setParallelism(1)
-              .uid("uid_streaming_source")
               .transform("split_reader", typeInfo, factory)
-              .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
-              .uid("uid_split_reader");
+              .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
           return new DataStreamSource<>(source);
         } else {
           InputFormatSourceFunction<RowData> func = new 
InputFormatSourceFunction<>(getInputFormat(), typeInfo);
           DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
-          return source.name("bounded_source")
-              .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
-              .uid("uid_bounded_source");
+          return 
source.name("bounded_source").setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
         }
       }
     };

Reply via email to