This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4263ad3ed43 [FLINK-36484] [table-api] Remove deprecated methods StreamTableEnvironment#registerFunction (#25529) 4263ad3ed43 is described below commit 4263ad3ed4319a16e09d68a989fd9fc40bfb39aa Author: Ammu <amm...@gmail.com> AuthorDate: Fri Jan 3 08:26:04 2025 +0530 [FLINK-36484] [table-api] Remove deprecated methods StreamTableEnvironment#registerFunction (#25529) * feat: remove deprecated function * fix: remove-deprecated-methods * fix: remove-deprecated-methods --------- Co-authored-by: Ammu Parvathy <ammu.parva...@ibm.com> --- .../api/bridge/java/StreamTableEnvironment.java | 54 ------------------ .../java/internal/StreamTableEnvironmentImpl.java | 37 ------------ .../api/bridge/scala/StreamTableEnvironment.scala | 66 ---------------------- .../internal/StreamTableEnvironmentImpl.scala | 40 ------------- 4 files changed, 197 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java index 86610a431fb..05bf6fabc82 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java @@ -37,10 +37,6 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.functions.TableAggregateFunction; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; @@ -122,56 +118,6 @@ public interface StreamTableEnvironment extends TableEnvironment { return StreamTableEnvironmentImpl.create(executionEnvironment, settings); } - /** - * Registers a {@link TableFunction} under a unique name in the TableEnvironment's catalog. - * Registered functions can be referenced in Table API and SQL queries. - * - * @param name The name under which the function is registered. - * @param tableFunction The TableFunction to register. - * @param <T> The type of the output row. - * @deprecated Use {@link #createTemporarySystemFunction(String, UserDefinedFunction)} instead. - * Please note that the new method also uses the new type system and reflective extraction - * logic. It might be necessary to update the function implementation as well. See the - * documentation of {@link TableFunction} for more information on the new function design. - */ - @Deprecated - <T> void registerFunction(String name, TableFunction<T> tableFunction); - - /** - * Registers an {@link AggregateFunction} under a unique name in the TableEnvironment's catalog. - * Registered functions can be referenced in Table API and SQL queries. - * - * @param name The name under which the function is registered. - * @param aggregateFunction The AggregateFunction to register. - * @param <T> The type of the output value. - * @param <ACC> The type of aggregate accumulator. - * @deprecated Use {@link #createTemporarySystemFunction(String, UserDefinedFunction)} instead. - * Please note that the new method also uses the new type system and reflective extraction - * logic. It might be necessary to update the function implementation as well. See the - * documentation of {@link AggregateFunction} for more information on the new function - * design. - */ - @Deprecated - <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction); - - /** - * Registers an {@link TableAggregateFunction} under a unique name in the TableEnvironment's - * catalog. Registered functions can only be referenced in Table API. - * - * @param name The name under which the function is registered. - * @param tableAggregateFunction The TableAggregateFunction to register. - * @param <T> The type of the output value. - * @param <ACC> The type of aggregate accumulator. - * @deprecated Use {@link #createTemporarySystemFunction(String, UserDefinedFunction)} instead. - * Please note that the new method also uses the new type system and reflective extraction - * logic. It might be necessary to update the function implementation as well. See the - * documentation of {@link TableAggregateFunction} for more information on the new function - * design. - */ - @Deprecated - <T, ACC> void registerFunction( - String name, TableAggregateFunction<T, ACC> tableAggregateFunction); - /** * Converts the given {@link DataStream} into a {@link Table}. * diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index 89aa1238d22..3b1591f8dee 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -48,10 +48,6 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.CatalogStoreFactory; import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.factories.TableFactoryUtil; -import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.functions.TableAggregateFunction; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.ExternalQueryOperation; import org.apache.flink.table.operations.OutputConversionModifyOperation; @@ -173,39 +169,6 @@ public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnviron settings.isStreamingMode()); } - @Override - public <T> void registerFunction(String name, TableFunction<T> tableFunction) { - TypeInformation<T> typeInfo = - UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction); - - functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo); - } - - @Override - public <T, ACC> void registerFunction( - String name, AggregateFunction<T, ACC> aggregateFunction) { - TypeInformation<T> typeInfo = - UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction); - TypeInformation<ACC> accTypeInfo = - UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction); - - functionCatalog.registerTempSystemAggregateFunction( - name, aggregateFunction, typeInfo, accTypeInfo); - } - - @Override - public <T, ACC> void registerFunction( - String name, TableAggregateFunction<T, ACC> tableAggregateFunction) { - TypeInformation<T> typeInfo = - UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction); - TypeInformation<ACC> accTypeInfo = - UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction( - tableAggregateFunction); - - functionCatalog.registerTempSystemAggregateFunction( - name, tableAggregateFunction, typeInfo, accTypeInfo); - } - @Override public <T> Table fromDataStream(DataStream<T> dataStream) { return fromStreamInternal(dataStream, null, null, ChangelogMode.insertOnly()); diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala index 2bed5ca258a..1238dc6da77 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala @@ -60,72 +60,6 @@ import org.apache.flink.types.{Row, RowKind} @PublicEvolving trait StreamTableEnvironment extends TableEnvironment { - /** - * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog. Registered - * functions can be referenced in SQL queries. - * - * @param name - * The name under which the function is registered. - * @param tf - * The TableFunction to register - * - * @deprecated - * Use [[createTemporarySystemFunction(String, UserDefinedFunction)]] instead. Please note that - * the new method also uses the new type system and reflective extraction logic. It might be - * necessary to update the function implementation as well. See the documentation of - * [[TableFunction]] for more information on the new function design. - */ - @deprecated - def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit - - /** - * Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. - * Registered functions can be referenced in Table API and SQL queries. - * - * @param name - * The name under which the function is registered. - * @param f - * The AggregateFunction to register. - * @tparam T - * The type of the output value. - * @tparam ACC - * The type of aggregate accumulator. - * - * @deprecated - * Use [[createTemporarySystemFunction(String, UserDefinedFunction)]] instead. Please note that - * the new method also uses the new type system and reflective extraction logic. It might be - * necessary to update the function implementation as well. See the documentation of - * [[AggregateFunction]] for more information on the new function design. - */ - @deprecated - def registerFunction[T: TypeInformation, ACC: TypeInformation]( - name: String, - f: AggregateFunction[T, ACC]): Unit - - /** - * Registers an [[TableAggregateFunction]] under a unique name in the TableEnvironment's catalog. - * Registered functions can only be referenced in Table API. - * - * @param name - * The name under which the function is registered. - * @param f - * The TableAggregateFunction to register. - * @tparam T - * The type of the output value. - * @tparam ACC - * The type of aggregate accumulator. - * - * @deprecated - * Use [[createTemporarySystemFunction(String, UserDefinedFunction)]] instead. Please note that - * the new method also uses the new type system and reflective extraction logic. It might be - * necessary to update the function implementation as well. See the documentation of - * [[TableAggregateFunction]] for more information on the new function design. - */ - @deprecated - def registerFunction[T: TypeInformation, ACC: TypeInformation]( - name: String, - f: TableAggregateFunction[T, ACC]): Unit - /** * Converts the given [[DataStream]] into a [[Table]]. * diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index 5c3e2c556c4..1225360a8e7 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -224,46 +224,6 @@ class StreamTableEnvironmentImpl( toStreamInternal(table, modifyOperation) } - override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = { - val typeInfo = UserDefinedFunctionHelper - .getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]]) - functionCatalog.registerTempSystemTableFunction( - name, - tf, - typeInfo - ) - } - - override def registerFunction[T: TypeInformation, ACC: TypeInformation]( - name: String, - f: AggregateFunction[T, ACC]): Unit = { - val typeInfo = UserDefinedFunctionHelper - .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]]) - val accTypeInfo = UserDefinedFunctionHelper - .getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]]) - functionCatalog.registerTempSystemAggregateFunction( - name, - f, - typeInfo, - accTypeInfo - ) - } - - override def registerFunction[T: TypeInformation, ACC: TypeInformation]( - name: String, - f: TableAggregateFunction[T, ACC]): Unit = { - val typeInfo = UserDefinedFunctionHelper - .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]]) - val accTypeInfo = UserDefinedFunctionHelper - .getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]]) - functionCatalog.registerTempSystemAggregateFunction( - name, - f, - typeInfo, - accTypeInfo - ) - } - override def createTemporaryView[T]( path: String, dataStream: DataStream[T],