This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 00c3443cb4112fbd94281d52465cccfbc00516c5 Author: superche <[email protected]> AuthorDate: Fri Nov 18 16:24:19 2022 +0800 optimize schema settings --- .../src/main/java/org/apache/hudi/util/HoodiePipeline.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java index f95367c836..0e7e262aeb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java @@ -18,6 +18,8 @@ package org.apache.hudi.util; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.utils.EncodingUtils; import org.apache.hudi.adapter.Utils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTableFactory; @@ -125,6 +127,18 @@ public class HoodiePipeline { return this; } + public Builder schema(Schema schema) { + for (Schema.UnresolvedColumn column : schema.getColumns()) { + column(column.toString()); + } + + if (schema.getPrimaryKey().isPresent()) { + pk(schema.getPrimaryKey().get().getColumnNames().stream().map(EncodingUtils::escapeIdentifier).collect(Collectors.joining(", "))); + } + + return this; + } + /** * Add a config option. */
