This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 530b794eb [flink][schema] Use builder to build schema for flink ddl 
(#1601)
530b794eb is described below

commit 530b794eb2fb66e782d30e49e36b965a6fb9fe9c
Author: Shammon FY <[email protected]>
AuthorDate: Mon Jul 24 11:43:35 2023 +0800

    [flink][schema] Use builder to build schema for flink ddl (#1601)
---
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 48 +++++++++++-----------
 .../apache/paimon/flink/LogicalTypeConversion.java |  8 ----
 .../org/apache/paimon/flink/FlinkCatalogTest.java  | 11 ++++-
 3 files changed, 33 insertions(+), 34 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index b5d58b51a..a629112c1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -25,10 +25,8 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.Preconditions;
 
-import org.apache.flink.table.api.Schema.UnresolvedColumn;
 import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.WatermarkSpec;
@@ -557,40 +555,40 @@ public class FlinkCatalog extends AbstractCatalog {
     public static Schema fromCatalogTable(CatalogTable catalogTable) {
         TableSchema schema = catalogTable.getSchema();
         RowType rowType = (RowType) 
schema.toPhysicalRowDataType().getLogicalType();
-        List<String> primaryKeys = new ArrayList<>();
-        if (schema.getPrimaryKey().isPresent()) {
-            primaryKeys = schema.getPrimaryKey().get().getColumns();
-        }
 
         Map<String, String> options = new HashMap<>(catalogTable.getOptions());
-
         // Serialize virtual columns and watermark to the options
         // This is what Flink SQL needs, the storage itself does not need them
         options.putAll(columnOptions(schema));
 
-        return new Schema(
-                addColumnComments(toDataType(rowType).getFields(), 
getColumnComments(catalogTable)),
-                catalogTable.getPartitionKeys(),
-                primaryKeys,
-                options,
-                catalogTable.getComment());
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .comment(catalogTable.getComment())
+                        .options(options)
+                        .primaryKey(
+                                schema.getPrimaryKey()
+                                        .map(pk -> pk.getColumns())
+                                        .orElse(Collections.emptyList()))
+                        .partitionKeys(catalogTable.getPartitionKeys());
+        Map<String, String> columnComments = getColumnComments(catalogTable);
+        rowType.getFields()
+                .forEach(
+                        field ->
+                                schemaBuilder.column(
+                                        field.getName(),
+                                        toDataType(field.getType()),
+                                        columnComments.get(field.getName())));
+
+        return schemaBuilder.build();
     }
 
     private static Map<String, String> getColumnComments(CatalogTable 
catalogTable) {
         return catalogTable.getUnresolvedSchema().getColumns().stream()
                 .filter(c -> c.getComment().isPresent())
-                .collect(Collectors.toMap(UnresolvedColumn::getName, c -> 
c.getComment().get()));
-    }
-
-    private static List<DataField> addColumnComments(
-            List<DataField> fields, Map<String, String> columnComments) {
-        return fields.stream()
-                .map(
-                        field -> {
-                            String comment = columnComments.get(field.name());
-                            return comment == null ? field : 
field.newDescription(comment);
-                        })
-                .collect(Collectors.toList());
+                .collect(
+                        Collectors.toMap(
+                                
org.apache.flink.table.api.Schema.UnresolvedColumn::getName,
+                                c -> c.getComment().get()));
     }
 
     /** Only reserve necessary options. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
index 93f155561..c792171ee 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
@@ -18,13 +18,11 @@
 
 package org.apache.paimon.flink;
 
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.table.types.logical.LogicalType;
 
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /** Conversion between {@link LogicalType} and {@link DataType}. */
@@ -51,10 +49,4 @@ public class LogicalTypeConversion {
             LogicalType logicalType, AtomicInteger currentHighestFieldId) {
         return logicalType.accept(new 
LogicalTypeToDataType(currentHighestFieldId));
     }
-
-    public static org.apache.flink.table.types.logical.RowType toRowType(
-            List<DataField> dataFields) {
-        return (org.apache.flink.table.types.logical.RowType)
-                DataTypeToLogicalType.INSTANCE.visit(new RowType(false, 
dataFields));
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index efdcddd3e..c182bc5d8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -96,7 +96,16 @@ public class FlinkCatalogTest {
                 Arrays.asList(
                         Column.physical("first", DataTypes.STRING()),
                         Column.physical("second", DataTypes.INT()),
-                        Column.physical("third", DataTypes.STRING())),
+                        Column.physical("third", DataTypes.STRING()),
+                        Column.physical(
+                                "four",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", DataTypes.INT()),
+                                        DataTypes.FIELD(
+                                                "f3",
+                                                DataTypes.MAP(
+                                                        DataTypes.STRING(), 
DataTypes.INT()))))),
                 Collections.emptyList(),
                 null);
     }

Reply via email to