This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a9597339a0 [FLINK-36494][table-common] Remove deprecated method 
Catalog#getTableFactory (#25948)
3a9597339a0 is described below

commit 3a9597339a085cf2ff47b4475bfc08814f0c206c
Author: Xuyang <xyzhong...@163.com>
AuthorDate: Mon Jan 13 09:50:35 2025 +0800

    [FLINK-36494][table-common] Remove deprecated method 
Catalog#getTableFactory (#25948)
---
 .../table/api/internal/TableEnvironmentImpl.java   |   1 -
 .../flink/table/factories/TableFactoryUtil.java    |  43 +--------
 .../table/operations/TruncateTableOperation.java   |   1 -
 .../org/apache/flink/table/catalog/Catalog.java    |  15 ---
 .../planner/operations/DeletePushDownUtils.java    |   8 +-
 .../operations/SqlNodeToOperationConversion.java   |   3 +-
 .../planner/plan/FlinkCalciteCatalogReader.java    |   1 -
 .../table/planner/delegation/PlannerBase.scala     |   2 -
 .../plan/schema/LegacyCatalogSourceTable.scala     |   1 -
 .../operations/DeletePushDownUtilsTest.java        |   4 +-
 .../planner/plan/common/TableFactoryTest.scala     | 101 ---------------------
 11 files changed, 6 insertions(+), 174 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 3b205333963..4b60d880777 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -1016,7 +1016,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
             ResolvedCatalogTable catalogTable) {
         if 
(tableConfig.get(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED)) {
             if (!TableFactoryUtil.isLegacyConnectorOptions(
-                    catalog,
                     tableConfig,
                     isStreamingMode,
                     createTableOperation.getTableIdentifier(),
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index 613d0822bd2..c27adbd12d4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -39,12 +38,9 @@ import 
org.apache.flink.table.legacy.factories.TableSourceFactory;
 import org.apache.flink.table.legacy.sinks.TableSink;
 import org.apache.flink.table.legacy.sources.TableSource;
 
-import javax.annotation.Nullable;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 /** Utility for dealing with {@link TableFactory} using the {@link 
TableFactoryService}. */
@@ -70,7 +66,6 @@ public class TableFactoryUtil {
      */
     @SuppressWarnings("unchecked")
     public static <T> TableSource<T> findAndCreateTableSource(
-            @Nullable Catalog catalog,
             ObjectIdentifier objectIdentifier,
             CatalogTable catalogTable,
             ReadableConfig configuration,
@@ -78,20 +73,7 @@ public class TableFactoryUtil {
         TableSourceFactory.Context context =
                 new TableSourceFactoryContextImpl(
                         objectIdentifier, catalogTable, configuration, 
isTemporary);
-        Optional<TableFactory> factoryOptional =
-                catalog == null ? Optional.empty() : catalog.getTableFactory();
-        if (factoryOptional.isPresent()) {
-            TableFactory factory = factoryOptional.get();
-            if (factory instanceof TableSourceFactory) {
-                return ((TableSourceFactory<T>) 
factory).createTableSource(context);
-            } else {
-                throw new ValidationException(
-                        "Cannot query a sink-only table. "
-                                + "TableFactory provided by catalog must 
implement TableSourceFactory");
-            }
-        } else {
-            return findAndCreateTableSource(context);
-        }
+        return findAndCreateTableSource(context);
     }
 
     /** Returns a table sink matching the context. */
@@ -113,7 +95,6 @@ public class TableFactoryUtil {
      */
     @SuppressWarnings("unchecked")
     public static <T> TableSink<T> findAndCreateTableSink(
-            @Nullable Catalog catalog,
             ObjectIdentifier objectIdentifier,
             CatalogTable catalogTable,
             ReadableConfig configuration,
@@ -126,30 +107,11 @@ public class TableFactoryUtil {
                         configuration,
                         !isStreamingMode,
                         isTemporary);
-        if (catalog == null) {
-            return findAndCreateTableSink(context);
-        } else {
-            return createTableSinkForCatalogTable(catalog, context)
-                    .orElseGet(() -> findAndCreateTableSink(context));
-        }
-    }
-
-    /**
-     * Creates a table sink for a {@link CatalogTable} using table factory 
associated with the
-     * catalog.
-     */
-    public static Optional<TableSink> createTableSinkForCatalogTable(
-            Catalog catalog, TableSinkFactory.Context context) {
-        TableFactory tableFactory = catalog.getTableFactory().orElse(null);
-        if (tableFactory instanceof TableSinkFactory) {
-            return Optional.ofNullable(((TableSinkFactory) 
tableFactory).createTableSink(context));
-        }
-        return Optional.empty();
+        return findAndCreateTableSink(context);
     }
 
     /** Checks whether the {@link CatalogTable} uses legacy connector sink 
options. */
     public static boolean isLegacyConnectorOptions(
-            @Nullable Catalog catalog,
             ReadableConfig configuration,
             boolean isStreamingMode,
             ObjectIdentifier objectIdentifier,
@@ -165,7 +127,6 @@ public class TableFactoryUtil {
                 // try to create legacy table source using the options,
                 // some legacy factories may use the 'type' key
                 TableFactoryUtil.findAndCreateTableSink(
-                        catalog,
                         objectIdentifier,
                         catalogTable,
                         configuration,
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java
index 64c070851be..186f704b511 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java
@@ -67,7 +67,6 @@ public class TruncateTableOperation implements 
ExecutableOperation {
         }
 
         if (TableFactoryUtil.isLegacyConnectorOptions(
-                
catalogManager.getCatalog(objectIdentifier.getCatalogName()).orElse(null),
                 ctx.getTableConfig(),
                 ctx.isStreamingMode(),
                 tableIdentifier,
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 84edc3679e0..f0dcc90bbc0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -43,7 +43,6 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
-import org.apache.flink.table.legacy.factories.TableFactory;
 import org.apache.flink.table.procedures.Procedure;
 
 import javax.annotation.Nullable;
@@ -87,20 +86,6 @@ public interface Catalog {
         return Optional.empty();
     }
 
-    /**
-     * Get an optional {@link TableFactory} instance that's responsible for 
generating table-related
-     * instances stored in this catalog, instances such as source/sink.
-     *
-     * @return an optional TableFactory instance
-     * @deprecated Use {@link #getFactory()} for the new factory stack. The 
new factory stack uses
-     *     the new table sources and sinks defined in FLIP-95 and a slightly 
different discovery
-     *     mechanism.
-     */
-    @Deprecated
-    default Optional<TableFactory> getTableFactory() {
-        return Optional.empty();
-    }
-
     /**
      * Get an optional {@link FunctionDefinitionFactory} instance that's 
responsible for
      * instantiating function definitions.
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
index 6c4f72e6b0d..a7323b3fa69 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.planner.operations;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -68,9 +67,7 @@ public class DeletePushDownUtils {
      * can't get the {@link DynamicTableSink}.
      */
     public static Optional<DynamicTableSink> getDynamicTableSink(
-            ContextResolvedTable contextResolvedTable,
-            LogicalTableModify tableModify,
-            CatalogManager catalogManager) {
+            ContextResolvedTable contextResolvedTable, LogicalTableModify 
tableModify) {
         final FlinkContext context = 
ShortcutUtils.unwrapContext(tableModify.getCluster());
 
         CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable();
@@ -83,9 +80,6 @@ public class DeletePushDownUtils {
             // only consider the CatalogTable that doesn't use legacy 
connector sink option
             if (!contextResolvedTable.isAnonymous()
                     && !TableFactoryUtil.isLegacyConnectorOptions(
-                            catalogManager
-                                    
.getCatalog(objectIdentifier.getCatalogName())
-                                    .orElse(null),
                             context.getTableConfig(),
                             !context.isBatchMode(),
                             objectIdentifier,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index 04602b91c15..2947aa504f5 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -1223,8 +1223,7 @@ public class SqlNodeToOperationConversion {
                         
catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
         // try push down delete
         Optional<DynamicTableSink> optionalDynamicTableSink =
-                DeletePushDownUtils.getDynamicTableSink(
-                        contextResolvedTable, tableModify, catalogManager);
+                DeletePushDownUtils.getDynamicTableSink(contextResolvedTable, 
tableModify);
         if (optionalDynamicTableSink.isPresent()) {
             DynamicTableSink dynamicTableSink = optionalDynamicTableSink.get();
             // if the table sink supports delete push down
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java
index 35733aedd89..b8770d2ed90 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java
@@ -235,7 +235,6 @@ public class FlinkCalciteCatalogReader extends 
CalciteCatalogReader {
                         TableSchemaUtils.removeTimeAttributeFromResolvedSchema(
                                 originTable.getResolvedSchema());
                 TableFactoryUtil.findAndCreateTableSource(
-                        
schemaTable.getContextResolvedTable().getCatalog().orElse(null),
                         schemaTable.getContextResolvedTable().getIdentifier(),
                         new ResolvedCatalogTable(
                                 CatalogTable.of(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index cea31d7f594..2254776c9a1 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -470,7 +470,6 @@ abstract class PlannerBase(
         if (
           !contextResolvedTable.isAnonymous &&
           TableFactoryUtil.isLegacyConnectorOptions(
-            
catalogManager.getCatalog(objectIdentifier.getCatalogName).orElse(null),
             tableConfig,
             isStreamingMode,
             objectIdentifier,
@@ -479,7 +478,6 @@ abstract class PlannerBase(
           )
         ) {
           val tableSink = TableFactoryUtil.findAndCreateTableSink(
-            catalog.orNull,
             objectIdentifier,
             tableToFind,
             getTableConfig,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
index 8537dd47f89..b811f21cd27 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
@@ -180,7 +180,6 @@ class LegacyCatalogSourceTable[T](
       TableSchemaUtils.removeTimeAttributeFromResolvedSchema(
         schemaTable.getContextResolvedTable.getResolvedSchema)
     val tableSource = TableFactoryUtil.findAndCreateTableSource(
-      schemaTable.getContextResolvedTable.getCatalog.orElse(null),
       identifier,
       new ResolvedCatalogTable(
         CatalogTable.of(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
index 09870eb9735..7aca5b69f3b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
@@ -99,7 +99,7 @@ public class DeletePushDownUtilsTest {
                         tableId, catalog, 
catalogManager.resolveCatalogTable(catalogTable));
         LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM 
t");
         Optional<DynamicTableSink> optionalDynamicTableSink =
-                DeletePushDownUtils.getDynamicTableSink(resolvedTable, 
tableModify, catalogManager);
+                DeletePushDownUtils.getDynamicTableSink(resolvedTable, 
tableModify);
         // verify we can get the dynamic table sink
         assertThat(optionalDynamicTableSink).isPresent();
         assertThat(optionalDynamicTableSink.get())
@@ -115,7 +115,7 @@ public class DeletePushDownUtilsTest {
                         tableId, catalog, 
catalogManager.resolveCatalogTable(catalogTable));
         tableModify = getTableModifyFromSql("DELETE FROM t1");
         optionalDynamicTableSink =
-                DeletePushDownUtils.getDynamicTableSink(resolvedTable, 
tableModify, catalogManager);
+                DeletePushDownUtils.getDynamicTableSink(resolvedTable, 
tableModify);
         // verify it should be empty since it's not an instance of 
DynamicTableSink but is legacy
         // TableSink
         assertThat(optionalDynamicTableSink).isEmpty();
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
deleted file mode 100644
index bd2125e975a..00000000000
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.common
-
-import org.apache.flink.table.catalog.{GenericInMemoryCatalog, 
ObjectIdentifier}
-import org.apache.flink.table.legacy.factories.TableFactory
-import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory
-import org.apache.flink.table.planner.plan.utils.TestContextTableFactory
-import org.apache.flink.table.planner.utils.TableTestBase
-import 
org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension,
 Parameters}
-
-import org.assertj.core.api.Assertions.assertThat
-import org.junit.jupiter.api.{BeforeEach, TestTemplate}
-import org.junit.jupiter.api.extension.ExtendWith
-
-import java.util.Optional
-
-@ExtendWith(Array(classOf[ParameterizedTestExtension]))
-class TableFactoryTest(isBatch: Boolean) extends TableTestBase {
-
-  private val util = if (isBatch) batchTestUtil() else streamTestUtil()
-
-  @BeforeEach
-  def before(): Unit = {
-    // we should clean the data to avoid serialization exception due to dirty 
data
-    TestCollectionTableFactory.reset()
-  }
-
-  @TestTemplate
-  def testTableSourceSinkFactory(): Unit = {
-    val factory = new TestContextTableFactory(
-      ObjectIdentifier.of("cat", "default", "t1"),
-      ObjectIdentifier.of("cat", "default", "t2"),
-      isBatch)
-    util.tableEnv.getConfig.set(TestContextTableFactory.REQUIRED_KEY, 
Boolean.box(true))
-    util.tableEnv.registerCatalog(
-      "cat",
-      new GenericInMemoryCatalog("default") {
-        override def getTableFactory: Optional[TableFactory] = 
Optional.of(factory)
-      })
-    util.tableEnv.useCatalog("cat")
-
-    val sourceDDL =
-      """
-        |create table t1(
-        |  a int,
-        |  b varchar,
-        |  c as a + 1
-        |) with (
-        |  'connector.type' = 'filesystem',
-        |  'connector.path' = '/to/my/path1',
-        |  'format.type' = 'csv'
-        |)
-      """.stripMargin
-    val sinkDDL =
-      """
-        |create table t2(
-        |  a int,
-        |  b as c - 1,
-        |  c int
-        |) with (
-        |  'connector.type' = 'filesystem',
-        |  'connector.path' = '/to/my/path2',
-        |  'format.type' = 'csv'
-        |)
-      """.stripMargin
-    val query =
-      """
-        |insert into t2
-        |select t1.a, t1.c from t1
-      """.stripMargin
-    util.tableEnv.executeSql(sourceDDL)
-    util.tableEnv.executeSql(sinkDDL)
-
-    util.tableEnv.explainSql(query)
-    assertThat(factory.hasInvokedSource).isTrue
-    assertThat(factory.hasInvokedSink).isTrue
-  }
-}
-
-object TableFactoryTest {
-  @Parameters(name = "isBatch: {0}")
-  def parameters(): java.util.Collection[Boolean] = {
-    java.util.Arrays.asList(true, false)
-  }
-}

Reply via email to