This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60497e7dfddf27b25b80b913ef259010e97569aa
Author: xuyang <xyzhong...@163.com>
AuthorDate: Thu Jan 9 10:29:49 2025 +0800

    [FLINK-36497][table] Remove deprecated method CatalogTable#toProperties
    
    Co-authored-by: shiwei10 <shiwe...@staff.weibo.com>
---
 docs/content.zh/docs/dev/table/catalogs.md         |   4 +-
 docs/content/docs/dev/table/catalogs.md            |   4 +-
 flink-python/pyflink/table/catalog.py              |   8 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   6 +-
 .../flink/table/catalog/CatalogTableImpl.java      | 118 ---------------------
 .../flink/table/catalog/ConnectorCatalogTable.java |   7 --
 .../flink/table/factories/TableFactoryUtil.java    |   7 +-
 .../table/operations/ddl/CreateTableOperation.java |   8 +-
 .../flink/table/catalog/CatalogTableImpTest.java   |  97 -----------------
 .../flink/table/catalog/CatalogPropertiesUtil.java |   2 +-
 .../apache/flink/table/catalog/CatalogTable.java   |  13 ---
 .../flink/table/catalog/DefaultCatalogTable.java   |   6 --
 .../flink/table/catalog/ResolvedCatalogTable.java  |   1 -
 .../table/legacy/factories/TableSinkFactory.java   |   3 +-
 .../table/legacy/factories/TableSourceFactory.java |   3 +-
 .../table/planner/connectors/DynamicSinkUtils.java |   3 +-
 .../operations/SqlCreateTableConverter.java        |   4 +-
 .../table/planner/delegation/PlannerBase.scala     |   2 +-
 .../planner/catalog/JavaCatalogTableTest.java      |   5 -
 .../operations/SqlDdlToOperationConverterTest.java |   8 +-
 .../table/planner/utils/testTableSourceSinks.scala |   4 +-
 21 files changed, 35 insertions(+), 278 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/catalogs.md 
b/docs/content.zh/docs/dev/table/catalogs.md
index 0c68ca85cd7..dfbad59df7b 100644
--- a/docs/content.zh/docs/dev/table/catalogs.md
+++ b/docs/content.zh/docs/dev/table/catalogs.md
@@ -361,13 +361,13 @@ catalog.list_databases()
 {{< tab "Java/Scala" >}}
 ```java
 // create table
-catalog.createTable(new ObjectPath("mydb", "mytable"), new 
CatalogTableImpl(...), false);
+catalog.createTable(new ObjectPath("mydb", "mytable"), 
CatalogTable.newBuilder()...build(), false);
 
 // drop table
 catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
 
 // alter table
-catalog.alterTable(new ObjectPath("mydb", "mytable"), new 
CatalogTableImpl(...), false);
+catalog.alterTable(new ObjectPath("mydb", "mytable"), 
CatalogTable.newBuilder()...build(), false);
 
 // rename table
 catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
diff --git a/docs/content/docs/dev/table/catalogs.md 
b/docs/content/docs/dev/table/catalogs.md
index 98e324b34f5..851d4f2884a 100644
--- a/docs/content/docs/dev/table/catalogs.md
+++ b/docs/content/docs/dev/table/catalogs.md
@@ -365,13 +365,13 @@ catalog.list_databases()
 {{< tab "Java/Scala" >}}
 ```java
 // create table
-catalog.createTable(new ObjectPath("mydb", "mytable"), new 
CatalogTableImpl(...), false);
+catalog.createTable(new ObjectPath("mydb", "mytable"), 
CatalogTable.newBuilder()...build(), false);
 
 // drop table
 catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
 
 // alter table
-catalog.alterTable(new ObjectPath("mydb", "mytable"), new 
CatalogTableImpl(...), false);
+catalog.alterTable(new ObjectPath("mydb", "mytable"), 
CatalogTable.newBuilder()...build(), false);
 
 // rename table
 catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
diff --git a/flink-python/pyflink/table/catalog.py 
b/flink-python/pyflink/table/catalog.py
index f6069a60a1b..4046650025b 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -830,8 +830,12 @@ class CatalogBaseTable(object):
 
         gateway = get_gateway()
         return CatalogBaseTable(
-            gateway.jvm.org.apache.flink.table.catalog.CatalogTableImpl(
-                schema._j_table_schema, partition_keys, properties, comment))
+            
gateway.jvm.org.apache.flink.table.catalog.CatalogTable.newBuilder()
+            .schema(schema._j_table_schema.toSchema())
+            .comment(comment)
+            .partitionKeys(partition_keys)
+            .options(properties)
+            .build())
 
     @staticmethod
     def create_view(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 4b60d880777..3c807a8a777 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -950,8 +950,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         }
         Catalog catalog =
                 
catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName());
-        ResolvedCatalogTable catalogTable =
-                
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+        ResolvedCatalogTable catalogTable = 
createTableOperation.getCatalogTable();
         Optional<DynamicTableSink> stagingDynamicTableSink =
                 getSupportsStagingDynamicTableSink(createTableOperation, 
catalog, catalogTable);
         if (stagingDynamicTableSink.isPresent()) {
@@ -985,8 +984,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         ObjectIdentifier tableIdentifier = 
createTableOperation.getTableIdentifier();
         Catalog catalog =
                 
catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName());
-        ResolvedCatalogTable catalogTable =
-                
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+        ResolvedCatalogTable catalogTable = 
createTableOperation.getCatalogTable();
         Optional<DynamicTableSink> stagingDynamicTableSink =
                 getSupportsStagingDynamicTableSink(createTableOperation, 
catalog, catalogTable);
         if (stagingDynamicTableSink.isPresent()) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
deleted file mode 100644
index 61c3ef06cab..00000000000
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.legacy.api.TableSchema;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.flink.table.legacy.descriptors.Schema.SCHEMA;
-
-/**
- * A catalog table implementation.
- *
- * @deprecated Use {@link CatalogTable#newBuilder()} or a custom 
implementation instead. Don't
- *     implement against this internal class. It can lead to unintended side 
effects if code checks
- *     against this class instead of the common interface.
- */
-@Deprecated
-@Internal
-public class CatalogTableImpl extends AbstractCatalogTable {
-
-    public CatalogTableImpl(
-            TableSchema tableSchema, Map<String, String> properties, String 
comment) {
-        this(tableSchema, new ArrayList<>(), properties, comment);
-    }
-
-    public CatalogTableImpl(
-            TableSchema tableSchema,
-            List<String> partitionKeys,
-            Map<String, String> properties,
-            String comment) {
-        super(tableSchema, partitionKeys, properties, comment);
-    }
-
-    @Override
-    public CatalogBaseTable copy() {
-        return new CatalogTableImpl(
-                getSchema().copy(),
-                new ArrayList<>(getPartitionKeys()),
-                new HashMap<>(getOptions()),
-                getComment());
-    }
-
-    @Override
-    public Optional<String> getDescription() {
-        return Optional.of(getComment());
-    }
-
-    @Override
-    public Optional<String> getDetailedDescription() {
-        return Optional.of("This is a catalog table in an im-memory catalog");
-    }
-
-    @Override
-    public Map<String, String> toProperties() {
-        DescriptorProperties descriptor = new DescriptorProperties(false);
-
-        descriptor.putTableSchema(SCHEMA, getSchema());
-        descriptor.putPartitionKeys(getPartitionKeys());
-
-        Map<String, String> properties = new HashMap<>(getOptions());
-
-        descriptor.putProperties(properties);
-
-        return descriptor.asMap();
-    }
-
-    @Override
-    public CatalogTable copy(Map<String, String> options) {
-        return new CatalogTableImpl(getSchema(), getPartitionKeys(), options, 
getComment());
-    }
-
-    /** Construct a {@link CatalogTableImpl} from complete properties that 
contains table schema. */
-    public static CatalogTableImpl fromProperties(Map<String, String> 
properties) {
-        DescriptorProperties descriptorProperties = new 
DescriptorProperties(false);
-        descriptorProperties.putProperties(properties);
-        TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
-        List<String> partitionKeys = descriptorProperties.getPartitionKeys();
-        return new CatalogTableImpl(
-                tableSchema,
-                partitionKeys,
-                removeRedundant(properties, tableSchema, partitionKeys),
-                "");
-    }
-
-    /** Construct catalog table properties from {@link #toProperties()}. */
-    public static Map<String, String> removeRedundant(
-            Map<String, String> properties, TableSchema schema, List<String> 
partitionKeys) {
-        Map<String, String> ret = new HashMap<>(properties);
-        DescriptorProperties descriptorProperties = new 
DescriptorProperties(false);
-        descriptorProperties.putTableSchema(SCHEMA, schema);
-        descriptorProperties.putPartitionKeys(partitionKeys);
-        descriptorProperties.asMap().keySet().forEach(ret::remove);
-        return ret;
-    }
-}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index 07e48035c0a..352a0dd3c95 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -93,13 +93,6 @@ public class ConnectorCatalogTable<T1, T2> extends 
AbstractCatalogTable {
         return isBatch;
     }
 
-    @Override
-    public Map<String, String> toProperties() {
-        // This effectively makes sure the table cannot be persisted in a 
catalog.
-        throw new UnsupportedOperationException(
-                "ConnectorCatalogTable cannot be converted to properties");
-    }
-
     @Override
     public CatalogTable copy(Map<String, String> options) {
         throw new UnsupportedOperationException(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index c27adbd12d4..3b16348a5a6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.listener.CatalogModificationListener;
 import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
@@ -52,7 +53,8 @@ public class TableFactoryUtil {
     public static <T> TableSource<T> 
findAndCreateTableSource(TableSourceFactory.Context context) {
         try {
             return TableFactoryService.find(
-                            TableSourceFactory.class, 
context.getTable().toProperties())
+                            TableSourceFactory.class,
+                            ((ResolvedCatalogTable) 
context.getTable()).toProperties())
                     .createTableSource(context);
         } catch (Throwable t) {
             throw new TableException("findAndCreateTableSource failed.", t);
@@ -81,7 +83,8 @@ public class TableFactoryUtil {
     public static <T> TableSink<T> 
findAndCreateTableSink(TableSinkFactory.Context context) {
         try {
             return TableFactoryService.find(
-                            TableSinkFactory.class, 
context.getTable().toProperties())
+                            TableSinkFactory.class,
+                            ((ResolvedCatalogTable) 
context.getTable()).toProperties())
                     .createTableSink(context);
         } catch (Throwable t) {
             throw new TableException("findAndCreateTableSink failed.", t);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java
index 1e0641e711c..2d3f91710e3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java
@@ -21,8 +21,8 @@ package org.apache.flink.table.operations.ddl;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.internal.TableResultImpl;
 import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.OperationUtils;
 
@@ -34,13 +34,13 @@ import java.util.Map;
 @Internal
 public class CreateTableOperation implements CreateOperation {
     private final ObjectIdentifier tableIdentifier;
-    private final CatalogTable catalogTable;
+    private final ResolvedCatalogTable catalogTable;
     private final boolean ignoreIfExists;
     private final boolean isTemporary;
 
     public CreateTableOperation(
             ObjectIdentifier tableIdentifier,
-            CatalogTable catalogTable,
+            ResolvedCatalogTable catalogTable,
             boolean ignoreIfExists,
             boolean isTemporary) {
         this.tableIdentifier = tableIdentifier;
@@ -49,7 +49,7 @@ public class CreateTableOperation implements CreateOperation {
         this.isTemporary = isTemporary;
     }
 
-    public CatalogTable getCatalogTable() {
+    public ResolvedCatalogTable getCatalogTable() {
         return catalogTable;
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTableImpTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTableImpTest.java
deleted file mode 100644
index 1310e5d292e..00000000000
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTableImpTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.legacy.api.TableSchema;
-import org.apache.flink.table.legacy.descriptors.Schema;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link CatalogTableImpl}. */
-class CatalogTableImpTest {
-    private static final String TEST = "test";
-
-    @Test
-    void testToProperties() {
-        TableSchema schema = createTableSchema();
-        Map<String, String> prop = createProperties();
-        CatalogTable table = new CatalogTableImpl(schema, 
createPartitionKeys(), prop, TEST);
-
-        DescriptorProperties descriptorProperties = new 
DescriptorProperties(false);
-        descriptorProperties.putProperties(table.toProperties());
-
-        
assertThat(descriptorProperties.getTableSchema(Schema.SCHEMA)).isEqualTo(schema);
-    }
-
-    @Test
-    void testFromProperties() {
-        TableSchema schema = createTableSchema();
-        Map<String, String> prop = createProperties();
-        CatalogTable table = new CatalogTableImpl(schema, 
createPartitionKeys(), prop, TEST);
-
-        CatalogTableImpl tableFromProperties =
-                CatalogTableImpl.fromProperties(table.toProperties());
-
-        
assertThat(table.getOptions()).isEqualTo(tableFromProperties.getOptions());
-        
assertThat(table.getPartitionKeys()).isEqualTo(tableFromProperties.getPartitionKeys());
-        
assertThat(table.getSchema()).isEqualTo(tableFromProperties.getSchema());
-    }
-
-    @Test
-    void testNullComment() {
-        TableSchema schema = createTableSchema();
-        Map<String, String> prop = createProperties();
-        CatalogTable table = new CatalogTableImpl(schema, 
createPartitionKeys(), prop, null);
-
-        assertThat(table.getComment()).isEmpty();
-        assertThat(table.getDescription()).isEqualTo(Optional.of(""));
-    }
-
-    private static Map<String, String> createProperties() {
-        return new HashMap<String, String>() {
-            {
-                put("k", "v");
-                put("K1", "V1"); // for test case-sensitive
-            }
-        };
-    }
-
-    private static TableSchema createTableSchema() {
-        return TableSchema.builder()
-                .field("first", DataTypes.STRING())
-                .field("second", DataTypes.INT())
-                .field("third", DataTypes.DOUBLE())
-                .field("Fourth", DataTypes.BOOLEAN()) // for test 
case-sensitive
-                .build();
-    }
-
-    private static List<String> createPartitionKeys() {
-        return Arrays.asList("second", "third");
-    }
-}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
index fe2807ce8ee..088870a6db1 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
@@ -236,7 +236,7 @@ public final class CatalogPropertiesUtil {
                     .schema(schema)
                     .comment(comment)
                     .partitionKeys(partitionKeys)
-                    .options(properties)
+                    .options(options)
                     .snapshot(snapshot)
                     .build();
         } catch (Exception e) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
index 11712d03c18..5777a1755be 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
@@ -98,19 +98,6 @@ public interface CatalogTable extends CatalogBaseTable {
      */
     CatalogTable copy(Map<String, String> options);
 
-    /**
-     * Serializes this instance into a map of string-based properties.
-     *
-     * <p>Compared to the pure table options in {@link #getOptions()}, the map 
includes schema,
-     * partitioning, and other characteristics in a serialized form.
-     *
-     * @deprecated Only a {@link ResolvedCatalogTable} is serializable to 
properties.
-     */
-    @Deprecated
-    default Map<String, String> toProperties() {
-        return Collections.emptyMap();
-    }
-
     /** Return the snapshot specified for the table. Return Optional.empty() 
if not specified. */
     default Optional<Long> getSnapshot() {
         return Optional.empty();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
index d2952c48030..b06e6acee26 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
@@ -127,12 +127,6 @@ public class DefaultCatalogTable implements CatalogTable {
         return Optional.empty();
     }
 
-    @Override
-    public Map<String, String> toProperties() {
-        throw new UnsupportedOperationException(
-                "Only a resolved catalog table can be serialized into a map of 
string properties.");
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java
index e52f6aa5a13..7f6aba71209 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java
@@ -71,7 +71,6 @@ public final class ResolvedCatalogTable
      * symmetric. The framework will resolve functions and perform other 
validation tasks. A catalog
      * implementation must not deal with this during a read operation.
      */
-    @Override
     public Map<String, String> toProperties() {
         return CatalogPropertiesUtil.serializeCatalogTable(this);
     }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSinkFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSinkFactory.java
index e3294a551d5..9a22bd50e25 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSinkFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSinkFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.legacy.sinks.TableSink;
 
@@ -65,7 +66,7 @@ public interface TableSinkFactory<T> extends TableFactory {
      */
     @Deprecated
     default TableSink<T> createTableSink(ObjectPath tablePath, CatalogTable 
table) {
-        return createTableSink(table.toProperties());
+        return createTableSink(((ResolvedCatalogTable) table).toProperties());
     }
 
     /**
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSourceFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSourceFactory.java
index 73073f17258..b85fb6eddd5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSourceFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSourceFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.legacy.sources.TableSource;
 
@@ -65,7 +66,7 @@ public interface TableSourceFactory<T> extends TableFactory {
      */
     @Deprecated
     default TableSource<T> createTableSource(ObjectPath tablePath, 
CatalogTable table) {
-        return createTableSource(table.toProperties());
+        return createTableSource(((ResolvedCatalogTable) 
table).toProperties());
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 285ec39bc52..5af511c83f3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -224,8 +224,7 @@ public final class DynamicSinkUtils {
             Map<String, String> staticPartitions,
             boolean isOverwrite,
             DynamicTableSink sink) {
-        final ResolvedCatalogTable catalogTable =
-                (ResolvedCatalogTable) createTableOperation.getCatalogTable();
+        final ResolvedCatalogTable catalogTable = 
createTableOperation.getCatalogTable();
 
         final ObjectIdentifier identifier = 
createTableOperation.getTableIdentifier();
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index b45e93f7b85..56129869fc8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -84,7 +84,7 @@ class SqlCreateTableConverter {
 
     /** Convert the {@link SqlCreateTable} node. */
     Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
-        CatalogTable catalogTable = createCatalogTable(sqlCreateTable);
+        ResolvedCatalogTable catalogTable = createCatalogTable(sqlCreateTable);
 
         UnresolvedIdentifier unresolvedIdentifier =
                 UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
@@ -188,7 +188,7 @@ class SqlCreateTableConverter {
         return catalogManager.resolveCatalogTable(catalogTable);
     }
 
-    private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) {
+    private ResolvedCatalogTable createCatalogTable(SqlCreateTable 
sqlCreateTable) {
 
         final Schema sourceTableSchema;
         final Optional<TableDistribution> sourceTableDistribution;
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 2254776c9a1..19f50472cb6 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -528,7 +528,7 @@ abstract class PlannerBase(
       staticPartitions: JMap[String, String],
       isOverwrite: Boolean): RelNode = {
     val input = createRelBuilder.queryOperation(queryOperation).build()
-    val table = 
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable)
+    val table = createTableOperation.getCatalogTable
 
     val identifier = createTableOperation.getTableIdentifier
     val catalog = toScala(catalogManager.getCatalog(identifier.getCatalogName))
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
index ea36fe66fea..d4c8d1f1aa6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
@@ -285,11 +285,6 @@ class JavaCatalogTableTest extends TableTestBase {
             return this;
         }
 
-        @Override
-        public Map<String, String> toProperties() {
-            return Collections.emptyMap();
-        }
-
         @Override
         public Map<String, String> getOptions() {
             Map<String, String> map = new HashMap<>();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 8a359d60aed..16c10a4b229 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -263,7 +263,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
         Operation operation = parse(sql, planner, parser);
         assertThat(operation).isInstanceOf(CreateTableOperation.class);
         CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
+        ResolvedCatalogTable catalogTable = op.getCatalogTable();
         
assertThat(catalogTable.getPartitionKeys()).hasSameElementsAs(Arrays.asList("a",
 "d"));
         assertThat(catalogTable.getSchema().getFieldNames())
                 .isEqualTo(new String[] {"a", "b", "c", "d"});
@@ -275,9 +275,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
                             DataTypes.INT(),
                             DataTypes.VARCHAR(Integer.MAX_VALUE)
                         });
-        assertThat(catalogTable).isInstanceOf(ResolvedCatalogTable.class);
-        ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) 
catalogTable;
-        resolvedCatalogTable
+        catalogTable
                 .getResolvedSchema()
                 .getColumn(0)
                 .ifPresent(
@@ -418,7 +416,7 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
                 SqlNodeToOperationConversion.convert(planner, catalogManager, 
node).get();
         assertThat(operation).isInstanceOf(CreateTableOperation.class);
         CreateTableOperation op = (CreateTableOperation) operation;
-        CatalogTable catalogTable = op.getCatalogTable();
+        ResolvedCatalogTable catalogTable = op.getCatalogTable();
         Map<String, String> properties = catalogTable.toProperties();
         Map<String, String> expected = new HashMap<>();
         expected.put("schema.0.name", "a");
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
index 5bcbf0b9a54..48f61d221a7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
@@ -462,11 +462,11 @@ class TestOptionsTableFactory extends 
TableSourceFactory[Row] with TableSinkFact
   }
 
   override def createTableSource(context: TableSourceFactory.Context): 
TableSource[Row] = {
-    createPropertiesSource(context.getTable.toProperties)
+    
createPropertiesSource(context.getTable.asInstanceOf[ResolvedCatalogTable].toProperties)
   }
 
   override def createTableSink(context: TableSinkFactory.Context): 
TableSink[Row] = {
-    createPropertiesSink(context.getTable.toProperties)
+    
createPropertiesSink(context.getTable.asInstanceOf[ResolvedCatalogTable].toProperties)
   }
 }
 

Reply via email to