This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0b57483 [HUDI-2015] Fix flink operator uid to allow multiple
pipelines in one job (#3091)
0b57483 is described below
commit 0b57483a8e41742689a1362aa94aabb94a1361b3
Author: Danny Chan <[email protected]>
AuthorDate: Thu Jun 17 09:08:19 2021 +0800
[HUDI-2015] Fix flink operator uid to allow multiple pipelines in one job
(#3091)
---
.../src/main/java/org/apache/hudi/table/HoodieTableSink.java | 8 ++++----
.../src/main/java/org/apache/hudi/table/HoodieTableSource.java | 8 ++------
2 files changed, 6 insertions(+), 10 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 4fbcbb5..c478893 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -93,17 +93,17 @@ public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning,
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
- .uid("uid_bucket_assigner")
+ .uid("uid_bucket_assigner_" +
conf.getString(FlinkOptions.TABLE_NAME))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class),
operatorFactory)
- .uid("uid_hoodie_stream_write")
+ .name("uid_hoodie_stream_write")
.setParallelism(numWriteTasks);
if (StreamerUtil.needsScheduleCompaction(conf)) {
return pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
- .uid("uid_compact_plan_generate")
+ .name("uid_compact_plan_generate")
.setParallelism(1) // plan generate must be singleton
.keyBy(event -> event.getOperation().hashCode())
.transform("compact_task",
@@ -116,7 +116,7 @@ public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning,
} else {
return pipeline.addSink(new CleanFunction<>(conf))
.setParallelism(1)
- .name("clean_commits").uid("uid_clean_commits");
+ .name("clean_commits");
}
};
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 50420f9..55ec46a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -181,17 +181,13 @@ public class HoodieTableSource implements
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData>
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source =
execEnv.addSource(monitoringFunction, "streaming_source")
.setParallelism(1)
- .uid("uid_streaming_source")
.transform("split_reader", typeInfo, factory)
- .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
- .uid("uid_split_reader");
+ .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source);
} else {
InputFormatSourceFunction<RowData> func = new
InputFormatSourceFunction<>(getInputFormat(), typeInfo);
DataStreamSource<RowData> source = execEnv.addSource(func,
asSummaryString(), typeInfo);
- return source.name("bounded_source")
- .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
- .uid("uid_bounded_source");
+ return
source.name("bounded_source").setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
}
}
};