This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fadd162826 [MINOR] Add database name to flink operator uid and 
operator name (#8961)
5fadd162826 is described below

commit 5fadd16282670f5c2e40b533d5ef33462e2ce4d8
Author: StreamingFlames <[email protected]>
AuthorDate: Fri Jun 16 11:18:16 2023 +0800

    [MINOR] Add database name to flink operator uid and operator name (#8961)
    
    * add database name to operator uid and operator name
    * Set database name automatically
    * use . instead of # as separator
    
    ---------
    
    Co-authored-by: fuqijun <[email protected]>
---
 .../src/main/java/org/apache/hudi/sink/utils/Pipelines.java   | 11 +++++++++--
 .../main/java/org/apache/hudi/table/HoodieTableFactory.java   |  2 ++
 .../java/org/apache/hudi/table/TestHoodieTableFactory.java    |  3 +++
 3 files changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 28ded1f1454..4cedb45b82c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.utils;
 import org.apache.hudi.common.model.ClusteringOperation;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.CleanFunction;
@@ -447,11 +448,17 @@ public class Pipelines {
   }
 
   public static String opName(String operatorN, Configuration conf) {
-    return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
+    return operatorN + ": " + getTablePath(conf);
   }
 
   public static String opUID(String operatorN, Configuration conf) {
-    return "uid_" + operatorN + "_" + conf.getString(FlinkOptions.TABLE_NAME);
+    return "uid_" + operatorN + "_" + getTablePath(conf);
+  }
+
+  public static String getTablePath(Configuration conf) {
+    String databaseName = conf.getString(FlinkOptions.DATABASE_NAME);
+    return StringUtils.isNullOrEmpty(databaseName) ? 
conf.getString(FlinkOptions.TABLE_NAME)
+        : databaseName + "." + conf.getString(FlinkOptions.TABLE_NAME);
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 5694df0dfd8..d528c325b29 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -246,6 +246,8 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
       ResolvedSchema schema) {
     // table name
     conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName());
+    // database name
+    conf.setString(FlinkOptions.DATABASE_NAME.key(), 
tablePath.getDatabaseName());
     // hoodie key about options
     setupHoodieKeyOptions(conf, table);
     // compaction options
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 03d39aeb99a..c6522cf32d1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -666,6 +666,9 @@ public class TestHoodieTableFactory {
         (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
     final Configuration conf1 = tableSink1.getConf();
     assertThat(conf1.get(FlinkOptions.PRE_COMBINE), is(true));
+    // check setup database name and table name automatically
+    assertThat(conf1.get(FlinkOptions.TABLE_NAME), is("t1"));
+    assertThat(conf1.get(FlinkOptions.DATABASE_NAME), is("db1"));
 
     // set up operation as 'insert'
     this.conf.setString(FlinkOptions.OPERATION, "insert");

Reply via email to