This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 530b794eb [flink][schema] Use builder to build schema for flink ddl
(#1601)
530b794eb is described below
commit 530b794eb2fb66e782d30e49e36b965a6fb9fe9c
Author: Shammon FY <[email protected]>
AuthorDate: Mon Jul 24 11:43:35 2023 +0800
[flink][schema] Use builder to build schema for flink ddl (#1601)
---
.../java/org/apache/paimon/flink/FlinkCatalog.java | 48 +++++++++++-----------
.../apache/paimon/flink/LogicalTypeConversion.java | 8 ----
.../org/apache/paimon/flink/FlinkCatalogTest.java | 11 ++++-
3 files changed, 33 insertions(+), 34 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index b5d58b51a..a629112c1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -25,10 +25,8 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.Preconditions;
-import org.apache.flink.table.api.Schema.UnresolvedColumn;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.WatermarkSpec;
@@ -557,40 +555,40 @@ public class FlinkCatalog extends AbstractCatalog {
public static Schema fromCatalogTable(CatalogTable catalogTable) {
TableSchema schema = catalogTable.getSchema();
RowType rowType = (RowType)
schema.toPhysicalRowDataType().getLogicalType();
- List<String> primaryKeys = new ArrayList<>();
- if (schema.getPrimaryKey().isPresent()) {
- primaryKeys = schema.getPrimaryKey().get().getColumns();
- }
Map<String, String> options = new HashMap<>(catalogTable.getOptions());
-
// Serialize virtual columns and watermark to the options
// This is what Flink SQL needs, the storage itself does not need them
options.putAll(columnOptions(schema));
- return new Schema(
- addColumnComments(toDataType(rowType).getFields(),
getColumnComments(catalogTable)),
- catalogTable.getPartitionKeys(),
- primaryKeys,
- options,
- catalogTable.getComment());
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .comment(catalogTable.getComment())
+ .options(options)
+ .primaryKey(
+ schema.getPrimaryKey()
+ .map(pk -> pk.getColumns())
+ .orElse(Collections.emptyList()))
+ .partitionKeys(catalogTable.getPartitionKeys());
+ Map<String, String> columnComments = getColumnComments(catalogTable);
+ rowType.getFields()
+ .forEach(
+ field ->
+ schemaBuilder.column(
+ field.getName(),
+ toDataType(field.getType()),
+ columnComments.get(field.getName())));
+
+ return schemaBuilder.build();
}
private static Map<String, String> getColumnComments(CatalogTable
catalogTable) {
return catalogTable.getUnresolvedSchema().getColumns().stream()
.filter(c -> c.getComment().isPresent())
- .collect(Collectors.toMap(UnresolvedColumn::getName, c ->
c.getComment().get()));
- }
-
- private static List<DataField> addColumnComments(
- List<DataField> fields, Map<String, String> columnComments) {
- return fields.stream()
- .map(
- field -> {
- String comment = columnComments.get(field.name());
- return comment == null ? field :
field.newDescription(comment);
- })
- .collect(Collectors.toList());
+ .collect(
+ Collectors.toMap(
+
org.apache.flink.table.api.Schema.UnresolvedColumn::getName,
+ c -> c.getComment().get()));
}
/** Only reserve necessary options. */
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
index 93f155561..c792171ee 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
@@ -18,13 +18,11 @@
package org.apache.paimon.flink;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.flink.table.types.logical.LogicalType;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/** Conversion between {@link LogicalType} and {@link DataType}. */
@@ -51,10 +49,4 @@ public class LogicalTypeConversion {
LogicalType logicalType, AtomicInteger currentHighestFieldId) {
return logicalType.accept(new
LogicalTypeToDataType(currentHighestFieldId));
}
-
- public static org.apache.flink.table.types.logical.RowType toRowType(
- List<DataField> dataFields) {
- return (org.apache.flink.table.types.logical.RowType)
- DataTypeToLogicalType.INSTANCE.visit(new RowType(false,
dataFields));
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index efdcddd3e..c182bc5d8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -96,7 +96,16 @@ public class FlinkCatalogTest {
Arrays.asList(
Column.physical("first", DataTypes.STRING()),
Column.physical("second", DataTypes.INT()),
- Column.physical("third", DataTypes.STRING())),
+ Column.physical("third", DataTypes.STRING()),
+ Column.physical(
+ "four",
+ DataTypes.ROW(
+ DataTypes.FIELD("f1",
DataTypes.STRING()),
+ DataTypes.FIELD("f2", DataTypes.INT()),
+ DataTypes.FIELD(
+ "f3",
+ DataTypes.MAP(
+ DataTypes.STRING(),
DataTypes.INT()))))),
Collections.emptyList(),
null);
}