This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 00c3443cb4112fbd94281d52465cccfbc00516c5
Author: superche <[email protected]>
AuthorDate: Fri Nov 18 16:24:19 2022 +0800

    optimize schema settings
---
 .../src/main/java/org/apache/hudi/util/HoodiePipeline.java | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
index f95367c836..0e7e262aeb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.util;
 
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.hudi.adapter.Utils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.HoodieTableFactory;
@@ -125,6 +127,18 @@ public class HoodiePipeline {
       return this;
     }
 
+    public Builder schema(Schema schema) {
+      for (Schema.UnresolvedColumn column : schema.getColumns()) {
+        column(column.toString());
+      }
+
+      if (schema.getPrimaryKey().isPresent()) {
+        
pk(schema.getPrimaryKey().get().getColumnNames().stream().map(EncodingUtils::escapeIdentifier).collect(Collectors.joining(",
 ")));
+      }
+
+      return this;
+    }
+
     /**
      * Add a config option.
      */

Reply via email to