This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0be352e0ec40abe5cc6cddadeff9a12872e86ed2
Author: yanenze <[email protected]>
AuthorDate: Fri Jun 10 05:48:20 2022 +0800

    [HUDI-4139]improvement for flink write operator name to identify tables 
easily (#5744)
    
    
    Co-authored-by: yanenze <[email protected]>
---
 .../main/java/org/apache/hudi/sink/utils/Pipelines.java    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 91ac2beadc..c969c10ed1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -114,7 +114,7 @@ public class Pipelines {
             conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
       }
       return dataStream
-          .transform("bucket_bulk_insert", TypeInformation.of(Object.class), 
operatorFactory)
+          .transform(writeOpIdentifier("bucket_bulk_insert", conf), 
TypeInformation.of(Object.class), operatorFactory)
           .uid("uid_bucket_bulk_insert" + 
conf.getString(FlinkOptions.TABLE_NAME))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
           .addSink(DummySink.INSTANCE)
@@ -146,7 +146,7 @@ public class Pipelines {
       }
     }
     return dataStream
-        .transform("hoodie_bulk_insert_write",
+        .transform(writeOpIdentifier("hoodie_bulk_insert_write", conf),
             TypeInformation.of(Object.class),
             operatorFactory)
         // follow the parallelism of upstream operators to avoid shuffle
@@ -190,7 +190,7 @@ public class Pipelines {
     WriteOperatorFactory<RowData> operatorFactory = 
AppendWriteOperator.getFactory(conf, rowType);
 
     return dataStream
-        .transform("hoodie_append_write", TypeInformation.of(Object.class), 
operatorFactory)
+        .transform(writeOpIdentifier("hoodie_append_write", conf), 
TypeInformation.of(Object.class), operatorFactory)
         .uid("uid_hoodie_stream_write" + 
conf.getString(FlinkOptions.TABLE_NAME))
         .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
         .addSink(DummySink.INSTANCE)
@@ -322,7 +322,7 @@ public class Pipelines {
       String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
       BucketIndexPartitioner<HoodieKey> partitioner = new 
BucketIndexPartitioner<>(bucketNum, indexKeyFields);
       return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
-          .transform("bucket_write", TypeInformation.of(Object.class), 
operatorFactory)
+          .transform(writeOpIdentifier("bucket_write", conf), 
TypeInformation.of(Object.class), operatorFactory)
           .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     } else {
@@ -338,7 +338,7 @@ public class Pipelines {
           
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
           // shuffle by fileId(bucket id)
           .keyBy(record -> record.getCurrentLocation().getFileId())
-          .transform("stream_write", TypeInformation.of(Object.class), 
operatorFactory)
+          .transform(writeOpIdentifier("stream_write", conf), 
TypeInformation.of(Object.class), operatorFactory)
           .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     }
@@ -385,6 +385,10 @@ public class Pipelines {
         .name("clean_commits");
   }
 
+  public static String writeOpIdentifier(String operatorN, Configuration conf) 
{
+    return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
+  }
+
   /**
    * Dummy sink that does nothing.
    */

Reply via email to