This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7df115c016cab326627e4c12b2a4c449c1794e95 Author: lincoln.lil <[email protected]> AuthorDate: Thu Sep 8 18:11:15 2022 +0800 [FLINK-28569][table-planner] Move non-deterministic test functions to userDefinedScalarFunctions This closes #20791 --- .../utils/userDefinedScalarFunctions.scala | 53 +++++++++++++++++- .../plan/stream/sql/NonDeterministicDagTest.scala | 64 ++-------------------- 2 files changed, 58 insertions(+), 59 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala index d45a71ba40c..e1468bead66 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala @@ -20,7 +20,8 @@ package org.apache.flink.table.planner.expressions.utils import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.api.Types -import org.apache.flink.table.functions.{FunctionContext, ScalarFunction} +import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, ScalarFunction, TableFunction} +import org.apache.flink.table.planner.utils.CountAccumulator import org.apache.flink.types.Row import org.apache.commons.lang3.StringUtils @@ -232,3 +233,53 @@ class SplitUDF(deterministic: Boolean) extends ScalarFunction { override def isDeterministic: Boolean = deterministic } + +@SerialVersionUID(1L) +class TestNonDeterministicUdf extends ScalarFunction { + val random = new Random() + + def eval(id: JLong): JLong = { + id + random.nextInt() + } + + def eval(id: Int): Int = { + id + random.nextInt() + } + + def eval(id: String): String = { + s"$id-${random.nextInt()}" + } + + override def isDeterministic: Boolean = false +} + +@SerialVersionUID(1L) +class TestNonDeterministicUdtf extends TableFunction[String] { + + val random = new Random() + + def eval(id: Int): Unit = { + collect(s"${id + random.nextInt()}") + } + + def eval(id: String): Unit = { + id.split(",").foreach(str => collect(s"$str#${random.nextInt()}")) + } + + override def isDeterministic: Boolean = false +} + +class TestNonDeterministicUdaf extends AggregateFunction[JLong, CountAccumulator] { + + val random = new Random() + + def accumulate(acc: CountAccumulator, in: JLong): Unit = { + acc.f0 += (in + random.nextInt()) + } + + override def getValue(acc: CountAccumulator): JLong = acc.f0 + + override def createAccumulator(): CountAccumulator = new CountAccumulator + + override def isDeterministic: Boolean = false +} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala index c86bb1c07e4..dbd44247dd0 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala @@ -26,10 +26,10 @@ import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfi import org.apache.flink.table.api.config.OptimizerConfigOptions.NonDeterministicUpdateStrategy import org.apache.flink.table.api.internal.TableEnvironmentInternal import org.apache.flink.table.data.RowData -import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} -import org.apache.flink.table.planner.{JBoolean, JLong} +import org.apache.flink.table.planner.JBoolean +import org.apache.flink.table.planner.expressions.utils.{TestNonDeterministicUdaf, TestNonDeterministicUdf, TestNonDeterministicUdtf} import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit -import org.apache.flink.table.planner.utils.{CountAccumulator, StreamTableTestUtil, TableTestBase} +import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableTestBase} import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.flink.table.sinks.UpsertStreamTableSink import org.apache.flink.table.types.DataType @@ -42,8 +42,6 @@ import org.junit.runners.Parameterized import java.util -import scala.util.Random - @RunWith(classOf[Parameterized]) class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUpdateStrategy) extends TableTestBase { @@ -212,9 +210,9 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp |)""".stripMargin) // custom ND function - util.tableEnv.createTemporaryFunction("ndFunc", TestNonDeterministicUdf) - util.tableEnv.createTemporaryFunction("ndTableFunc", TestNonDeterministicUdtf) - util.tableEnv.createTemporaryFunction("ndAggFunc", TestTestNonDeterministicUdaf) + util.tableEnv.createTemporaryFunction("ndFunc", new TestNonDeterministicUdf) + util.tableEnv.createTemporaryFunction("ndTableFunc", new TestNonDeterministicUdtf) + util.tableEnv.createTemporaryFunction("ndAggFunc", new TestNonDeterministicUdaf) // deterministic table function util.tableEnv.createTemporaryFunction("str_split", new StringSplit()) } @@ -1561,56 +1559,6 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp util.verifyExecPlan(stmtSet) } - @SerialVersionUID(1L) - object TestNonDeterministicUdf extends ScalarFunction { - val random = new Random() - - def eval(id: JLong): JLong = { - id + random.nextInt() - } - - def eval(id: Int): Int = { - id + random.nextInt() - } - - def eval(id: String): String = { - s"$id-${random.nextInt()}" - } - - override def isDeterministic: Boolean = false - } - - @SerialVersionUID(1L) - object TestNonDeterministicUdtf extends TableFunction[String] { - - val random = new Random() - - def eval(id: Int): Unit = { - collect(s"${id + random.nextInt()}") - } - - def eval(id: String): Unit = { - id.split(",").foreach(str => collect(s"$str#${random.nextInt()}")) - } - - override def isDeterministic: Boolean = false - } - - object TestTestNonDeterministicUdaf extends AggregateFunction[JLong, CountAccumulator] { - - val random = new Random() - - def accumulate(acc: CountAccumulator, in: JLong): Unit = { - acc.f0 += (in + random.nextInt()) - } - - override def getValue(acc: CountAccumulator): JLong = acc.f0 - - override def createAccumulator(): CountAccumulator = new CountAccumulator - - override def isDeterministic: Boolean = false - } - /** * This upsert test sink does support getting primary key from table schema. We defined a similar * test sink here not using existing {@link TestingUpsertTableSink} in {@link StreamTestSink}
