This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5fadd162826 [MINOR] Add database name to flink operator uid and
operator name (#8961)
5fadd162826 is described below
commit 5fadd16282670f5c2e40b533d5ef33462e2ce4d8
Author: StreamingFlames <[email protected]>
AuthorDate: Fri Jun 16 11:18:16 2023 +0800
[MINOR] Add database name to flink operator uid and operator name (#8961)
* add database name to operator uid and operator name
* Set database name automatically
* use . instead of # as separator
---------
Co-authored-by: fuqijun <[email protected]>
---
.../src/main/java/org/apache/hudi/sink/utils/Pipelines.java | 11 +++++++++--
.../main/java/org/apache/hudi/table/HoodieTableFactory.java | 2 ++
.../java/org/apache/hudi/table/TestHoodieTableFactory.java | 3 +++
3 files changed, 14 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 28ded1f1454..4cedb45b82c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.CleanFunction;
@@ -447,11 +448,17 @@ public class Pipelines {
}
public static String opName(String operatorN, Configuration conf) {
- return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
+ return operatorN + ": " + getTablePath(conf);
}
public static String opUID(String operatorN, Configuration conf) {
- return "uid_" + operatorN + "_" + conf.getString(FlinkOptions.TABLE_NAME);
+ return "uid_" + operatorN + "_" + getTablePath(conf);
+ }
+
+ public static String getTablePath(Configuration conf) {
+ String databaseName = conf.getString(FlinkOptions.DATABASE_NAME);
+ return StringUtils.isNullOrEmpty(databaseName) ?
conf.getString(FlinkOptions.TABLE_NAME)
+ : databaseName + "." + conf.getString(FlinkOptions.TABLE_NAME);
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 5694df0dfd8..d528c325b29 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -246,6 +246,8 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
ResolvedSchema schema) {
// table name
conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName());
+ // database name
+ conf.setString(FlinkOptions.DATABASE_NAME.key(),
tablePath.getDatabaseName());
// hoodie key about options
setupHoodieKeyOptions(conf, table);
// compaction options
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 03d39aeb99a..c6522cf32d1 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -666,6 +666,9 @@ public class TestHoodieTableFactory {
(HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.get(FlinkOptions.PRE_COMBINE), is(true));
+ // check setup database name and table name automatically
+ assertThat(conf1.get(FlinkOptions.TABLE_NAME), is("t1"));
+ assertThat(conf1.get(FlinkOptions.DATABASE_NAME), is("db1"));
// set up operation as 'insert'
this.conf.setString(FlinkOptions.OPERATION, "insert");