This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.11.1-rc2-prep in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0be352e0ec40abe5cc6cddadeff9a12872e86ed2 Author: yanenze <[email protected]> AuthorDate: Fri Jun 10 05:48:20 2022 +0800 [HUDI-4139]improvement for flink write operator name to identify tables easily (#5744) Co-authored-by: yanenze <[email protected]> --- .../main/java/org/apache/hudi/sink/utils/Pipelines.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 91ac2beadc..c969c10ed1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -114,7 +114,7 @@ public class Pipelines { conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); } return dataStream - .transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory) + .transform(writeOpIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) .addSink(DummySink.INSTANCE) @@ -146,7 +146,7 @@ public class Pipelines { } } return dataStream - .transform("hoodie_bulk_insert_write", + .transform(writeOpIdentifier("hoodie_bulk_insert_write", conf), TypeInformation.of(Object.class), operatorFactory) // follow the parallelism of upstream operators to avoid shuffle @@ -190,7 +190,7 @@ public class Pipelines { WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType); return dataStream - .transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory) + .transform(writeOpIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) .addSink(DummySink.INSTANCE) @@ -322,7 +322,7 @@ public class Pipelines { String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) - .transform("bucket_write", TypeInformation.of(Object.class), operatorFactory) + .transform(writeOpIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } else { @@ -338,7 +338,7 @@ public class Pipelines { .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("stream_write", TypeInformation.of(Object.class), operatorFactory) + .transform(writeOpIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } @@ -385,6 +385,10 @@ public class Pipelines { .name("clean_commits"); } + public static String writeOpIdentifier(String operatorN, Configuration conf) { + return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME); + } + /** * Dummy sink that does nothing. */
