LadyForest commented on code in PR #23751:
URL: https://github.com/apache/flink/pull/23751#discussion_r1399931555
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala:
##########
@@ -22,14 +22,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{DataTypes, Types}
Review Comment:
Nit: I think we can remove unused imports here
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala:
##########
@@ -63,7 +63,7 @@ class TableScanTest extends TableTestBase {
@Test
def testDDLWithComputedColumn(): Unit = {
// Create table with field as atom expression.
- util.tableEnv.registerFunction("my_udf", Func0)
+ util.tableEnv.createTemporarySystemFunction("my_udf", Func0)
Review Comment:
Nit: I think `util.tableEnv.createTemporarySystemFunction` can be replaced
to `util.addTemporarySystemFunction`
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml:
##########
@@ -321,12 +321,12 @@
Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRIN
<![CDATA[
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
-+-
LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)],
rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)])
++-
LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)],
rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)])
Review Comment:
I think the schema should remain unchanged
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala:
##########
@@ -275,21 +296,44 @@ object UserDefinedFunctionTestUtils {
object MyPojoFunc extends ScalarFunction {
def eval(s: MyPojo): Int = s.f2
- override def getParameterTypes(signature: Array[Class[_]]):
Array[TypeInformation[_]] =
- Array(MyToPojoFunc.getResultType(signature))
+ override def getTypeInference(typeFactory: DataTypeFactory): TypeInference
= {
+ TypeInference.newBuilder
+ .typedArguments(
+ DataTypes.STRUCTURED(
+ classOf[MyPojo],
+ DataTypes.FIELD("f1", DataTypes.INT()),
+ DataTypes.FIELD("f2", DataTypes.INT())))
+ .outputTypeStrategy((call: CallContext) =>
Optional.of(DataTypes.INT().notNull()))
+ .build
+ }
}
@SerialVersionUID(1L)
object MyToPojoFunc extends ScalarFunction {
- def eval(s: Int): MyPojo = new MyPojo(s, s)
- override def getResultType(signature: Array[Class[_]]):
PojoTypeInfo[MyPojo] = {
+ def eval(s: Int) = new MyPojo(s, s)
+
+ /*override def getResultType(signature: Array[Class[_]]):
PojoTypeInfo[MyPojo] = {
Review Comment:
Nit: I think we can remove `getResultType`.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala:
##########
@@ -235,7 +235,7 @@ class DagOptimizationTest extends TableTestBase {
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
Boolean.box(true))
// test with non-deterministic udf
- util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())
+ util.tableEnv.createTemporarySystemFunction("random_udf", new
NonDeterministicUdf())
Review Comment:
Nit: I noticed that `util.addTemporarySystemFunction` and
`util.tableEnv.createTemporarySystemFunction` are interchangeably used. What do
you think about unifying the calling method?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/SumAggFunction.scala:
##########
@@ -20,7 +20,13 @@ package org.apache.flink.table.planner.utils
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
Review Comment:
Nit: the unused imports can be removed
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/CountAggFunction.scala:
##########
@@ -20,8 +20,11 @@ package org.apache.flink.table.planner.utils
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.api.{DataTypes, Types}
Review Comment:
Nit: the unused imports can be removed
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala:
##########
@@ -23,8 +23,12 @@ import org.apache.flink.api.java.typeutils.{PojoField,
PojoTypeInfo, RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint, HintFlag}
Review Comment:
Nit: remove unused imports
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala:
##########
@@ -429,12 +431,24 @@ abstract class SplittableTableFunction[A, B] extends
TableFunction[Tuple3[String
@SerialVersionUID(1L)
class PojoTableFunc extends TableFunction[PojoUser] {
- def eval(user: String) {
+ def eval(user: String): Unit = {
if (user.contains("#")) {
val splits = user.split("#")
collect(new PojoUser(splits(0), splits(1).toInt))
}
}
+
+ override def getTypeInference(typeFactory: DataTypeFactory): TypeInference =
{
+ TypeInference.newBuilder
+ .typedArguments(DataTypes.STRING())
+ .outputTypeStrategy(
+ TypeStrategies.explicit(
+ DataTypes.STRUCTURED(
+ classOf[PojoUser],
+ DataTypes.FIELD("age", DataTypes.INT()),
Review Comment:
I think the output field should be ["name", "age"] based on the `PojoUser`
constructor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]