This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fb887f47f [flink] Avoid deprecated usage on TableSchema, DataType and 
DescriptorProperties (#4611)
6fb887f47f is described below

commit 6fb887f47f2e79f6b3142f094b20b6d7a3f86846
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Dec 2 21:11:23 2024 +0800

    [flink] Avoid deprecated usage on TableSchema, DataType and 
DescriptorProperties (#4611)
---
 .../org/apache/paimon/flink/DataCatalogTable.java  | 115 ++++++++++++------
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  55 +++++----
 .../apache/paimon/flink/FlinkGenericCatalog.java   |   6 -
 .../apache/paimon/flink/SystemCatalogTable.java    |  12 +-
 .../flink/utils/FlinkCatalogPropertiesUtil.java    | 102 +++++-----------
 .../flink/utils/FlinkDescriptorProperties.java     |  99 ++++++++++++++++
 .../flink/FlinkCatalogPropertiesUtilTest.java      | 130 +++++++++++++++------
 .../org/apache/paimon/flink/FlinkCatalogTest.java  |   9 +-
 8 files changed, 342 insertions(+), 186 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
index 019d7bd689..e141581b47 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
@@ -23,33 +23,55 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
 
 import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
-/** A {@link CatalogTableImpl} to wrap {@link FileStoreTable}. */
-public class DataCatalogTable extends CatalogTableImpl {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link CatalogTable} to wrap {@link FileStoreTable}. */
+public class DataCatalogTable implements CatalogTable {
+    // Schema of the table (column names and types)
+    private final Schema schema;
+
+    // Partition keys if this is a partitioned table. It's an empty set if the 
table is not
+    // partitioned
+    private final List<String> partitionKeys;
+
+    // Properties of the table
+    private final Map<String, String> options;
+
+    // Comment of the table
+    private final String comment;
 
     private final Table table;
     private final Map<String, String> nonPhysicalColumnComments;
 
     public DataCatalogTable(
             Table table,
-            TableSchema tableSchema,
+            Schema resolvedSchema,
             List<String> partitionKeys,
-            Map<String, String> properties,
+            Map<String, String> options,
             String comment,
             Map<String, String> nonPhysicalColumnComments) {
-        super(tableSchema, partitionKeys, properties, comment);
+        this.schema = resolvedSchema;
+        this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot 
be null");
+        this.options = checkNotNull(options, "options cannot be null");
+
+        checkArgument(
+                options.entrySet().stream()
+                        .allMatch(e -> e.getKey() != null && e.getValue() != 
null),
+                "properties cannot have null keys or values");
+
+        this.comment = comment;
+
         this.table = table;
         this.nonPhysicalColumnComments = nonPhysicalColumnComments;
     }
@@ -66,32 +88,30 @@ public class DataCatalogTable extends CatalogTableImpl {
                         .filter(dataField -> dataField.description() != null)
                         .collect(Collectors.toMap(DataField::name, 
DataField::description));
 
-        return toSchema(getSchema(), columnComments);
+        return toSchema(schema, columnComments);
     }
 
-    /** Copied from {@link TableSchema#toSchema(Map)} to support versions 
lower than 1.17. */
-    private Schema toSchema(TableSchema tableSchema, Map<String, String> 
comments) {
+    private Schema toSchema(Schema tableSchema, Map<String, String> comments) {
         final Schema.Builder builder = Schema.newBuilder();
-
         tableSchema
-                .getTableColumns()
+                .getColumns()
                 .forEach(
                         column -> {
-                            if (column instanceof TableColumn.PhysicalColumn) {
-                                final TableColumn.PhysicalColumn c =
-                                        (TableColumn.PhysicalColumn) column;
-                                builder.column(c.getName(), c.getType());
-                            } else if (column instanceof 
TableColumn.MetadataColumn) {
-                                final TableColumn.MetadataColumn c =
-                                        (TableColumn.MetadataColumn) column;
+                            if (column instanceof 
Schema.UnresolvedPhysicalColumn) {
+                                final Schema.UnresolvedPhysicalColumn c =
+                                        (Schema.UnresolvedPhysicalColumn) 
column;
+                                builder.column(c.getName(), c.getDataType());
+                            } else if (column instanceof 
Schema.UnresolvedMetadataColumn) {
+                                final Schema.UnresolvedMetadataColumn c =
+                                        (Schema.UnresolvedMetadataColumn) 
column;
                                 builder.columnByMetadata(
                                         c.getName(),
-                                        c.getType(),
-                                        c.getMetadataAlias().orElse(null),
+                                        c.getDataType(),
+                                        c.getMetadataKey(),
                                         c.isVirtual());
-                            } else if (column instanceof 
TableColumn.ComputedColumn) {
-                                final TableColumn.ComputedColumn c =
-                                        (TableColumn.ComputedColumn) column;
+                            } else if (column instanceof 
Schema.UnresolvedComputedColumn) {
+                                final Schema.UnresolvedComputedColumn c =
+                                        (Schema.UnresolvedComputedColumn) 
column;
                                 builder.columnByExpression(c.getName(), 
c.getExpression());
                             } else {
                                 throw new IllegalArgumentException(
@@ -104,19 +124,16 @@ public class DataCatalogTable extends CatalogTableImpl {
                                 
builder.withComment(nonPhysicalColumnComments.get(colName));
                             }
                         });
-
         tableSchema
                 .getWatermarkSpecs()
                 .forEach(
                         spec ->
                                 builder.watermark(
-                                        spec.getRowtimeAttribute(), 
spec.getWatermarkExpr()));
-
+                                        spec.getColumnName(), 
spec.getWatermarkExpression()));
         if (tableSchema.getPrimaryKey().isPresent()) {
-            UniqueConstraint primaryKey = tableSchema.getPrimaryKey().get();
-            builder.primaryKeyNamed(primaryKey.getName(), 
primaryKey.getColumns());
+            Schema.UnresolvedPrimaryKey primaryKey = 
tableSchema.getPrimaryKey().get();
+            builder.primaryKeyNamed(primaryKey.getConstraintName(), 
primaryKey.getColumnNames());
         }
-
         return builder.build();
     }
 
@@ -124,7 +141,7 @@ public class DataCatalogTable extends CatalogTableImpl {
     public CatalogBaseTable copy() {
         return new DataCatalogTable(
                 table,
-                getSchema().copy(),
+                schema,
                 new ArrayList<>(getPartitionKeys()),
                 new HashMap<>(getOptions()),
                 getComment(),
@@ -135,10 +152,40 @@ public class DataCatalogTable extends CatalogTableImpl {
     public CatalogTable copy(Map<String, String> options) {
         return new DataCatalogTable(
                 table,
-                getSchema(),
+                schema,
                 getPartitionKeys(),
                 options,
                 getComment(),
                 nonPhysicalColumnComments);
     }
+
+    @Override
+    public Optional<String> getDescription() {
+        return Optional.of(getComment());
+    }
+
+    @Override
+    public Optional<String> getDetailedDescription() {
+        return Optional.of("This is a catalog table in an im-memory catalog");
+    }
+
+    @Override
+    public boolean isPartitioned() {
+        return !partitionKeys.isEmpty();
+    }
+
+    @Override
+    public List<String> getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    @Override
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
+    @Override
+    public String getComment() {
+        return comment != null ? comment : "";
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index c67e79c1c0..3a7f9790cc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.procedure.ProcedureUtil;
 import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
+import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.FileStoreCommit;
@@ -46,7 +47,6 @@ import org.apache.paimon.utils.StringUtils;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewImpl;
 
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
@@ -96,7 +96,6 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
-import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.procedures.Procedure;
@@ -121,13 +120,6 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT;
-import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 import static org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes;
@@ -152,11 +144,18 @@ import static 
org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
 import static org.apache.paimon.flink.log.LogStoreRegister.unRegisterLogSystem;
+import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.COMMENT;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR;
 import static 
org.apache.paimon.flink.utils.TableStatsUtil.createTableColumnStats;
 import static org.apache.paimon.flink.utils.TableStatsUtil.createTableStats;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -1008,18 +1007,18 @@ public class FlinkCatalog extends AbstractCatalog {
         }
         // materialized table is not resolved at this time.
         if (!table1IsMaterialized) {
-            org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
-            org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
+            org.apache.flink.table.api.Schema ts1 = ct1.getUnresolvedSchema();
+            org.apache.flink.table.api.Schema ts2 = ct2.getUnresolvedSchema();
             boolean pkEquality = false;
 
             if (ts1.getPrimaryKey().isPresent() && 
ts2.getPrimaryKey().isPresent()) {
                 pkEquality =
                         Objects.equals(
-                                        ts1.getPrimaryKey().get().getType(),
-                                        ts2.getPrimaryKey().get().getType())
+                                        
ts1.getPrimaryKey().get().getConstraintName(),
+                                        
ts2.getPrimaryKey().get().getConstraintName())
                                 && Objects.equals(
-                                        ts1.getPrimaryKey().get().getColumns(),
-                                        
ts2.getPrimaryKey().get().getColumns());
+                                        
ts1.getPrimaryKey().get().getColumnNames(),
+                                        
ts2.getPrimaryKey().get().getColumnNames());
             } else if (!ts1.getPrimaryKey().isPresent() && 
!ts2.getPrimaryKey().isPresent()) {
                 pkEquality = true;
             }
@@ -1063,7 +1062,8 @@ public class FlinkCatalog extends AbstractCatalog {
     private CatalogBaseTable toCatalogTable(Table table) {
         Map<String, String> newOptions = new HashMap<>(table.options());
 
-        TableSchema.Builder builder = TableSchema.builder();
+        org.apache.flink.table.api.Schema.Builder builder =
+                org.apache.flink.table.api.Schema.newBuilder();
         Map<String, String> nonPhysicalColumnComments = new HashMap<>();
 
         // add columns
@@ -1078,10 +1078,10 @@ public class FlinkCatalog extends AbstractCatalog {
             if (optionalName == null || 
physicalColumns.contains(optionalName)) {
                 // build physical column from table row field
                 RowType.RowField field = 
physicalRowFields.get(physicalColumnIndex++);
-                builder.field(field.getName(), 
fromLogicalToDataType(field.getType()));
+                builder.column(field.getName(), 
fromLogicalToDataType(field.getType()));
             } else {
                 // build non-physical column from options
-                builder.add(deserializeNonPhysicalColumn(newOptions, i));
+                deserializeNonPhysicalColumn(newOptions, i, builder);
                 if (newOptions.containsKey(compoundKey(SCHEMA, i, COMMENT))) {
                     nonPhysicalColumnComments.put(
                             optionalName, newOptions.get(compoundKey(SCHEMA, 
i, COMMENT)));
@@ -1093,22 +1093,18 @@ public class FlinkCatalog extends AbstractCatalog {
         // extract watermark information
         if (newOptions.keySet().stream()
                 .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, 
WATERMARK)))) {
-            builder.watermark(deserializeWatermarkSpec(newOptions));
+            deserializeWatermarkSpec(newOptions, builder);
         }
 
         // add primary keys
         if (table.primaryKeys().size() > 0) {
-            builder.primaryKey(
-                    
table.primaryKeys().stream().collect(Collectors.joining("_", "PK_", "")),
-                    table.primaryKeys().toArray(new String[0]));
+            builder.primaryKey(table.primaryKeys());
         }
 
-        TableSchema schema = builder.build();
+        org.apache.flink.table.api.Schema schema = builder.build();
 
         // remove schema from options
-        DescriptorProperties removeProperties = new 
DescriptorProperties(false);
-        removeProperties.putTableSchema(SCHEMA, schema);
-        removeProperties.asMap().keySet().forEach(newOptions::remove);
+        FlinkDescriptorProperties.removeSchemaKeys(SCHEMA, schema, newOptions);
 
         Options options = Options.fromMap(newOptions);
         if (TableType.MATERIALIZED_TABLE == options.get(CoreOptions.TYPE)) {
@@ -1124,7 +1120,10 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     private CatalogMaterializedTable buildMaterializedTable(
-            Table table, Map<String, String> newOptions, TableSchema schema, 
Options options) {
+            Table table,
+            Map<String, String> newOptions,
+            org.apache.flink.table.api.Schema schema,
+            Options options) {
         String definitionQuery = 
options.get(MATERIALIZED_TABLE_DEFINITION_QUERY);
         IntervalFreshness freshness =
                 IntervalFreshness.of(
@@ -1148,7 +1147,7 @@ public class FlinkCatalog extends AbstractCatalog {
         // remove materialized table related options
         allMaterializedTableAttributes().forEach(newOptions::remove);
         return CatalogMaterializedTable.newBuilder()
-                .schema(schema.toSchema())
+                .schema(schema)
                 .comment(table.comment().orElse(""))
                 .partitionKeys(table.partitionKeys())
                 .options(newOptions)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
index 37bed2d048..75af5917bb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
@@ -48,7 +48,6 @@ import 
org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
-import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.table.procedures.Procedure;
 
 import java.util.List;
@@ -86,11 +85,6 @@ public class FlinkGenericCatalog extends AbstractCatalog {
                 new FlinkGenericTableFactory(paimon.getFactory().get(), 
flink.getFactory().get()));
     }
 
-    @Override
-    public Optional<TableFactory> getTableFactory() {
-        return flink.getTableFactory();
-    }
-
     @Override
     public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
         return flink.getFunctionDefinitionFactory();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
index d5d843d91b..f88a808713 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
@@ -22,7 +22,6 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.system.AuditLogTable;
 
 import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.WatermarkSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.types.utils.TypeConversions;
 
@@ -32,11 +31,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
 
 /** A {@link CatalogTable} to represent system table. */
 public class SystemCatalogTable implements CatalogTable {
@@ -60,11 +59,8 @@ public class SystemCatalogTable implements CatalogTable {
             Map<String, String> newOptions = new HashMap<>(table.options());
             if (newOptions.keySet().stream()
                     .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, 
WATERMARK)))) {
-                WatermarkSpec watermarkSpec = 
deserializeWatermarkSpec(newOptions);
-                return builder.watermark(
-                                watermarkSpec.getRowtimeAttribute(),
-                                watermarkSpec.getWatermarkExpr())
-                        .build();
+                deserializeWatermarkSpec(newOptions, builder);
+                return builder.build();
             }
         }
         return builder.build();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
index b0f99a6e89..fa84a1ca07 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
@@ -20,8 +20,7 @@ package org.apache.paimon.flink.utils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
 
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.types.DataType;
@@ -36,48 +35,23 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
-import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.COMMENT;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.DATA_TYPE;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.EXPR;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.METADATA;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.VIRTUAL;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR;
 
 /**
  * Utilities for ser/deserializing non-physical columns and watermark 
into/from a map of string
  * properties.
  */
 public class FlinkCatalogPropertiesUtil {
-
-    public static Map<String, String> serializeNonPhysicalColumns(
-            Map<String, Integer> indexMap, List<TableColumn> 
nonPhysicalColumns) {
-        Map<String, String> serialized = new HashMap<>();
-        for (TableColumn c : nonPhysicalColumns) {
-            int index = indexMap.get(c.getName());
-            serialized.put(compoundKey(SCHEMA, index, NAME), c.getName());
-            serialized.put(
-                    compoundKey(SCHEMA, index, DATA_TYPE),
-                    c.getType().getLogicalType().asSerializableString());
-            if (c instanceof TableColumn.ComputedColumn) {
-                TableColumn.ComputedColumn computedColumn = 
(TableColumn.ComputedColumn) c;
-                serialized.put(compoundKey(SCHEMA, index, EXPR), 
computedColumn.getExpression());
-            } else {
-                TableColumn.MetadataColumn metadataColumn = 
(TableColumn.MetadataColumn) c;
-                serialized.put(
-                        compoundKey(SCHEMA, index, METADATA),
-                        
metadataColumn.getMetadataAlias().orElse(metadataColumn.getName()));
-                serialized.put(
-                        compoundKey(SCHEMA, index, VIRTUAL),
-                        Boolean.toString(metadataColumn.isVirtual()));
-            }
-        }
-        return serialized;
-    }
+    public static final String SCHEMA = "schema";
 
     /** Serialize non-physical columns of new api. */
     public static Map<String, String> 
serializeNonPhysicalNewColumns(ResolvedSchema schema) {
@@ -119,22 +93,6 @@ public class FlinkCatalogPropertiesUtil {
         return serialized;
     }
 
-    public static Map<String, String> serializeWatermarkSpec(WatermarkSpec 
watermarkSpec) {
-        Map<String, String> serializedWatermarkSpec = new HashMap<>();
-        String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
-        serializedWatermarkSpec.put(
-                compoundKey(watermarkPrefix, WATERMARK_ROWTIME),
-                watermarkSpec.getRowtimeAttribute());
-        serializedWatermarkSpec.put(
-                compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR),
-                watermarkSpec.getWatermarkExpr());
-        serializedWatermarkSpec.put(
-                compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE),
-                
watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString());
-
-        return serializedWatermarkSpec;
-    }
-
     public static Map<String, String> serializeNewWatermarkSpec(
             org.apache.flink.table.catalog.WatermarkSpec watermarkSpec) {
         Map<String, String> serializedWatermarkSpec = new HashMap<>();
@@ -219,7 +177,8 @@ public class FlinkCatalogPropertiesUtil {
                 && 
SCHEMA_COLUMN_NAME_SUFFIX.matcher(key.substring(SCHEMA.length() + 1)).matches();
     }
 
-    public static TableColumn deserializeNonPhysicalColumn(Map<String, String> 
options, int index) {
+    public static void deserializeNonPhysicalColumn(
+            Map<String, String> options, int index, Schema.Builder builder) {
         String nameKey = compoundKey(SCHEMA, index, NAME);
         String dataTypeKey = compoundKey(SCHEMA, index, DATA_TYPE);
         String exprKey = compoundKey(SCHEMA, index, EXPR);
@@ -227,45 +186,42 @@ public class FlinkCatalogPropertiesUtil {
         String virtualKey = compoundKey(SCHEMA, index, VIRTUAL);
 
         String name = options.get(nameKey);
-        DataType dataType =
-                TypeConversions.fromLogicalToDataType(
-                        LogicalTypeParser.parse(options.get(dataTypeKey)));
 
-        TableColumn column;
         if (options.containsKey(exprKey)) {
-            column = TableColumn.computed(name, dataType, 
options.get(exprKey));
+            final String expr = options.get(exprKey);
+            builder.columnByExpression(name, expr);
         } else if (options.containsKey(metadataKey)) {
             String metadataAlias = options.get(metadataKey);
             boolean isVirtual = Boolean.parseBoolean(options.get(virtualKey));
-            column =
-                    metadataAlias.equals(name)
-                            ? TableColumn.metadata(name, dataType, isVirtual)
-                            : TableColumn.metadata(name, dataType, 
metadataAlias, isVirtual);
+            DataType dataType =
+                    TypeConversions.fromLogicalToDataType(
+                            LogicalTypeParser.parse(
+                                    options.get(dataTypeKey),
+                                    
Thread.currentThread().getContextClassLoader()));
+            if (metadataAlias.equals(name)) {
+                builder.columnByMetadata(name, dataType, isVirtual);
+            } else {
+                builder.columnByMetadata(name, dataType, metadataAlias, 
isVirtual);
+            }
         } else {
             throw new RuntimeException(
                     String.format(
                             "Failed to build non-physical column. Current 
index is %s, options are %s",
                             index, options));
         }
-
-        return column;
     }
 
-    public static WatermarkSpec deserializeWatermarkSpec(Map<String, String> 
options) {
+    public static void deserializeWatermarkSpec(
+            Map<String, String> options, Schema.Builder builder) {
         String watermarkPrefixKey = compoundKey(SCHEMA, WATERMARK);
 
         String rowtimeKey = compoundKey(watermarkPrefixKey, 0, 
WATERMARK_ROWTIME);
         String exprKey = compoundKey(watermarkPrefixKey, 0, 
WATERMARK_STRATEGY_EXPR);
-        String dataTypeKey = compoundKey(watermarkPrefixKey, 0, 
WATERMARK_STRATEGY_DATA_TYPE);
 
         String rowtimeAttribute = options.get(rowtimeKey);
         String watermarkExpressionString = options.get(exprKey);
-        DataType watermarkExprOutputType =
-                TypeConversions.fromLogicalToDataType(
-                        LogicalTypeParser.parse(options.get(dataTypeKey)));
 
-        return new WatermarkSpec(
-                rowtimeAttribute, watermarkExpressionString, 
watermarkExprOutputType);
+        builder.watermark(rowtimeAttribute, watermarkExpressionString);
     }
 
     public static String compoundKey(Object... components) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java
new file mode 100644
index 0000000000..edc73ca7bf
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.table.api.Schema;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for having a unified string-based representation of Table API 
related classes such
+ * as Schema, TypeInformation, etc.
+ *
+ * <p>Note to implementers: Please try to reuse key names as much as possible. 
Key-names should be
+ * hierarchical and lower case. Use "-" instead of dots or camel case. E.g.,
+ * connector.schema.start-from = from-earliest. Try not to use the higher 
level in a key-name. E.g.,
+ * instead of connector.kafka.kafka-version use connector.kafka.version.
+ *
+ * <p>Properties with key normalization enabled contain only lower-case keys.
+ */
+public class FlinkDescriptorProperties {
+
+    public static final String NAME = "name";
+
+    public static final String DATA_TYPE = "data-type";
+
+    public static final String EXPR = "expr";
+
+    public static final String METADATA = "metadata";
+
+    public static final String VIRTUAL = "virtual";
+
+    public static final String WATERMARK = "watermark";
+
+    public static final String WATERMARK_ROWTIME = "rowtime";
+
+    public static final String WATERMARK_STRATEGY = "strategy";
+
+    public static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY + 
'.' + EXPR;
+
+    public static final String WATERMARK_STRATEGY_DATA_TYPE = 
WATERMARK_STRATEGY + '.' + DATA_TYPE;
+
+    public static final String PRIMARY_KEY_NAME = "primary-key.name";
+
+    public static final String PRIMARY_KEY_COLUMNS = "primary-key.columns";
+
+    public static final String COMMENT = "comment";
+
+    public static void removeSchemaKeys(String key, Schema schema, Map<String, 
String> options) {
+        checkNotNull(key);
+        checkNotNull(schema);
+
+        List<String> subKeys = Arrays.asList(NAME, DATA_TYPE, EXPR, METADATA, 
VIRTUAL);
+        for (int idx = 0; idx < schema.getColumns().size(); idx++) {
+            for (String subKey : subKeys) {
+                options.remove(key + '.' + idx + '.' + subKey);
+            }
+        }
+
+        if (!schema.getWatermarkSpecs().isEmpty()) {
+            subKeys =
+                    Arrays.asList(
+                            WATERMARK_ROWTIME,
+                            WATERMARK_STRATEGY_EXPR,
+                            WATERMARK_STRATEGY_DATA_TYPE);
+            for (int idx = 0; idx < schema.getWatermarkSpecs().size(); idx++) {
+                for (String subKey : subKeys) {
+                    options.remove(key + '.' + WATERMARK + '.' + idx + '.' + 
subKey);
+                }
+            }
+        }
+
+        schema.getPrimaryKey()
+                .ifPresent(
+                        pk -> {
+                            options.remove(key + '.' + PRIMARY_KEY_NAME);
+                            options.remove(key + '.' + PRIMARY_KEY_COLUMNS);
+                        });
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
index 9268a236b6..e32150b1fe 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
@@ -21,27 +21,35 @@ package org.apache.paimon.flink;
 import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
 
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.types.DataType;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
-import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.DATA_TYPE;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.EXPR;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.METADATA;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.VIRTUAL;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static 
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link FlinkCatalogPropertiesUtil}. */
@@ -49,18 +57,27 @@ public class FlinkCatalogPropertiesUtilTest {
 
     @Test
     public void testSerDeNonPhysicalColumns() {
-        Map<String, Integer> indexMap = new HashMap<>();
-        indexMap.put("comp", 2);
-        indexMap.put("meta1", 3);
-        indexMap.put("meta2", 5);
-        List<TableColumn> columns = new ArrayList<>();
-        columns.add(TableColumn.computed("comp", DataTypes.INT(), "`k` * 2"));
-        columns.add(TableColumn.metadata("meta1", DataTypes.VARCHAR(10)));
-        columns.add(TableColumn.metadata("meta2", 
DataTypes.BIGINT().notNull(), "price", true));
+        List<Schema.UnresolvedColumn> columns = new ArrayList<>();
+        columns.add(new Schema.UnresolvedComputedColumn("comp", new 
SqlCallExpression("`k` * 2")));
+        columns.add(
+                new Schema.UnresolvedMetadataColumn("meta1", 
DataTypes.VARCHAR(10), null, false));
+        columns.add(
+                new Schema.UnresolvedMetadataColumn(
+                        "meta2", DataTypes.BIGINT().notNull(), "price", true, 
null));
+
+        List<Column> resolvedColumns = new ArrayList<>();
+        resolvedColumns.add(Column.physical("phy1", DataTypes.INT()));
+        resolvedColumns.add(Column.physical("phy2", DataTypes.INT()));
+        resolvedColumns.add(
+                Column.computed("comp", new TestResolvedExpression("`k` * 2", 
DataTypes.INT())));
+        resolvedColumns.add(Column.metadata("meta1", DataTypes.VARCHAR(10), 
null, false));
+        resolvedColumns.add(Column.physical("phy3", DataTypes.INT()));
+        resolvedColumns.add(Column.metadata("meta2", 
DataTypes.BIGINT().notNull(), "price", true));
 
         // validate serialization
         Map<String, String> serialized =
-                
FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns(indexMap, columns);
+                FlinkCatalogPropertiesUtil.serializeNonPhysicalNewColumns(
+                        new ResolvedSchema(resolvedColumns, 
Collections.emptyList(), null));
 
         Map<String, String> expected = new HashMap<>();
         expected.put(compoundKey(SCHEMA, 2, NAME), "comp");
@@ -80,27 +97,26 @@ public class FlinkCatalogPropertiesUtilTest {
         assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected);
 
         // validate deserialization
-        List<TableColumn> deserialized = new ArrayList<>();
-        
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
 2));
-        
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
 3));
-        
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
 5));
+        Schema.Builder builder = Schema.newBuilder();
+        FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 2, 
builder);
+        FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 3, 
builder);
+        FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 5, 
builder);
 
-        assertThat(deserialized).isEqualTo(columns);
-
-        // validate that
+        assertThat(builder.build().getColumns())
+                .containsExactly(columns.toArray(new 
Schema.UnresolvedColumn[0]));
     }
 
     @Test
     public void testSerDeWatermarkSpec() {
         WatermarkSpec watermarkSpec =
-                new WatermarkSpec(
+                WatermarkSpec.of(
                         "test_time",
-                        "`test_time` - INTERVAL '0.001' SECOND",
-                        DataTypes.TIMESTAMP(3));
+                        new TestResolvedExpression(
+                                "`test_time` - INTERVAL '0.001' SECOND", 
DataTypes.TIMESTAMP(3)));
 
         // validate serialization
         Map<String, String> serialized =
-                
FlinkCatalogPropertiesUtil.serializeWatermarkSpec(watermarkSpec);
+                
FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec(watermarkSpec);
 
         Map<String, String> expected = new HashMap<>();
         String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
@@ -113,9 +129,13 @@ public class FlinkCatalogPropertiesUtilTest {
         assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected);
 
         // validate serialization
-        WatermarkSpec deserialized =
-                
FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized);
-        assertThat(deserialized).isEqualTo(watermarkSpec);
+        Schema.Builder builder = Schema.newBuilder();
+        FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized, 
builder);
+        assertThat(builder.build().getWatermarkSpecs()).hasSize(1);
+        Schema.UnresolvedWatermarkSpec actual = 
builder.build().getWatermarkSpecs().get(0);
+        
assertThat(actual.getColumnName()).isEqualTo(watermarkSpec.getRowtimeAttribute());
+        assertThat(actual.getWatermarkExpression().asSummaryString())
+                
.isEqualTo(watermarkSpec.getWatermarkExpression().asSummaryString());
     }
 
     @Test
@@ -150,4 +170,44 @@ public class FlinkCatalogPropertiesUtilTest {
                                 oldStyleOptions, Arrays.asList("phy1", 
"phy2")))
                 .isEqualTo(3);
     }
+
+    private static class TestResolvedExpression implements ResolvedExpression {
+        private final String name;
+        private final DataType outputDataType;
+
+        private TestResolvedExpression(String name, DataType outputDataType) {
+            this.name = name;
+            this.outputDataType = outputDataType;
+        }
+
+        @Override
+        public DataType getOutputDataType() {
+            return outputDataType;
+        }
+
+        @Override
+        public List<ResolvedExpression> getResolvedChildren() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public String asSummaryString() {
+            return new SqlCallExpression(name).asSummaryString();
+        }
+
+        @Override
+        public String asSerializableString() {
+            return name;
+        }
+
+        @Override
+        public List<Expression> getChildren() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public <R> R accept(ExpressionVisitor<R> expressionVisitor) {
+            return null;
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 27a8951097..e4286eb181 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -850,7 +850,7 @@ public class FlinkCatalogTest {
         assertThat(t2.getComment()).isEqualTo(t1.getComment());
         assertThat(t2.getOptions()).isEqualTo(t1.getOptions());
         if (t1.getTableKind() == CatalogBaseTable.TableKind.TABLE) {
-            assertThat(t2.getSchema()).isEqualTo(t1.getSchema());
+            
assertThat(t2.getUnresolvedSchema()).isEqualTo(t1.getUnresolvedSchema());
             assertThat(((CatalogTable) (t2)).getPartitionKeys())
                     .isEqualTo(((CatalogTable) (t1)).getPartitionKeys());
             assertThat(((CatalogTable) (t2)).isPartitioned())
@@ -864,7 +864,12 @@ public class FlinkCatalogTest {
                                             t2.getUnresolvedSchema()
                                                     .resolve(new 
TestSchemaResolver()))
                                     .build())
-                    .isEqualTo(t1.getSchema().toSchema());
+                    .isEqualTo(
+                            Schema.newBuilder()
+                                    .fromResolvedSchema(
+                                            t1.getUnresolvedSchema()
+                                                    .resolve(new 
TestSchemaResolver()))
+                                    .build());
             
assertThat(mt2.getPartitionKeys()).isEqualTo(mt1.getPartitionKeys());
             assertThat(mt2.isPartitioned()).isEqualTo(mt1.isPartitioned());
             // validate definition query


Reply via email to