Repository: flink Updated Branches: refs/heads/master 9829ca00d -> 427dfe42e
[FLINK-7410] [table] Use UserDefinedFunction.toString() to display operator names of UDFs. This closes #4624. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/427dfe42 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/427dfe42 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/427dfe42 Branch: refs/heads/master Commit: 427dfe42e2bea891b40e662bc97cdea57cdae3f5 Parents: dccdba1 Author: åé¿ <[email protected]> Authored: Wed Aug 30 19:30:52 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Tue Oct 10 23:09:07 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/TableEnvironment.scala | 7 +++-- .../flink/table/expressions/aggregations.scala | 3 +- .../apache/flink/table/expressions/call.scala | 1 + .../flink/table/functions/ScalarFunction.scala | 2 -- .../flink/table/functions/TableFunction.scala | 2 -- .../table/functions/UserDefinedFunction.scala | 8 ++++- .../table/functions/utils/AggSqlFunction.scala | 14 ++++++++- .../functions/utils/ScalarSqlFunction.scala | 4 +++ .../functions/utils/TableSqlFunction.scala | 3 ++ .../utils/UserDefinedFunctionUtils.scala | 8 +++-- .../flink/table/plan/logical/operators.scala | 1 + .../table/plan/nodes/CommonCorrelate.scala | 20 ++++++++---- .../plan/nodes/dataset/DataSetCorrelate.scala | 20 +++++++++--- .../nodes/datastream/DataStreamCorrelate.scala | 16 ++++++++-- .../plan/rules/logical/LogicalUnnestRule.scala | 1 + .../utils/JavaUserDefinedAggFunctions.java | 5 +++ .../flink/table/api/TableSourceTest.scala | 2 +- .../table/api/batch/sql/CorrelateTest.scala | 30 ++++++++++++------ .../flink/table/api/batch/table/CalcTest.scala | 12 +++---- .../table/api/batch/table/CorrelateTest.scala | 10 +++--- .../table/api/batch/table/GroupWindowTest.scala | 6 ++-- .../table/api/stream/sql/CorrelateTest.scala | 30 ++++++++++++------ .../table/api/stream/table/CorrelateTest.scala | 33 +++++++++++++------- .../api/stream/table/GroupWindowTest.scala | 6 ++-- .../table/api/stream/table/OverWindowTest.scala | 3 +- .../plan/ExpressionReductionRulesTest.scala | 2 +- .../plan/TimeIndicatorConversionTest.scala | 3 +- .../table/runtime/stream/table/CalcITCase.scala | 2 +- 28 files changed, 176 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index dc82a87..54877ba 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -331,7 +331,9 @@ abstract class TableEnvironment(val config: TableConfig) { functionCatalog.registerFunction(name, function.getClass) // register in SQL API - functionCatalog.registerSqlFunction(createScalarSqlFunction(name, function, typeFactory)) + functionCatalog.registerSqlFunction( + createScalarSqlFunction(name, name, function, typeFactory) + ) } /** @@ -355,7 +357,7 @@ abstract class TableEnvironment(val config: TableConfig) { functionCatalog.registerFunction(name, function.getClass) // register in SQL API - val sqlFunction = createTableSqlFunction(name, function, typeInfo, typeFactory) + val sqlFunction = createTableSqlFunction(name, name, function, typeInfo, typeFactory) functionCatalog.registerSqlFunction(sqlFunction) } @@ -384,6 +386,7 @@ abstract class TableEnvironment(val config: TableConfig) { // register in SQL API val sqlFunctions = createAggregateSqlFunction( name, + name, function, resultTypeInfo, accTypeInfo, http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala index c2d1bdf..1ffcb12 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala @@ -261,7 +261,8 @@ case class AggFunctionCall( override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] AggSqlFunction( - aggregateFunction.getClass.getSimpleName, + aggregateFunction.functionIdentifier, + aggregateFunction.toString, aggregateFunction, resultType, accTypeInfo, http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala index cad9ccc..8454555 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala @@ -272,6 +272,7 @@ case class ScalarFunctionCall( relBuilder.call( createScalarSqlFunction( scalarFunction.functionIdentifier, + scalarFunction.toString, scalarFunction, typeFactory), parameters.map(_.toRexNode): _*) http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala index 40c60ac..e41b876 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala @@ -56,8 +56,6 @@ abstract class ScalarFunction extends UserDefinedFunction { ScalarFunctionCall(this, params) } - override def toString: String = getClass.getCanonicalName - // ---------------------------------------------------------------------------------------------- /** http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala index b6e801a..ff69954 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala @@ -81,8 +81,6 @@ import org.apache.flink.util.Collector */ abstract class TableFunction[T] extends UserDefinedFunction { - override def toString: String = getClass.getCanonicalName - // ---------------------------------------------------------------------------------------------- /** http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala index b841b31..15bcb17 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala @@ -41,7 +41,7 @@ abstract class UserDefinedFunction extends Serializable { def close(): Unit = {} /** - * @return true iff a call to this function is guaranteed to always return + * @return true if and only if a call to this function is guaranteed to always return * the same result given the same parameters; true is assumed by default * if user's function is not pure functional, like random(), date(), now()... * isDeterministic must return false @@ -52,4 +52,10 @@ abstract class UserDefinedFunction extends Serializable { val md5 = DigestUtils.md5Hex(serialize(this)) getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5) } + + /** + * Returns the name of the UDF that is used for plan explain and logging. + */ + override def toString: String = getClass.getSimpleName + } http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala index bb71d63..f44598b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala @@ -35,6 +35,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ * Calcite wrapper for user-defined aggregate functions. * * @param name function name (used by SQL parser) + * @param displayName name to be displayed in operator name * @param aggregateFunction aggregate function to be called * @param returnType the type information of returned value * @param accType the type information of the accumulator @@ -42,6 +43,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ */ class AggSqlFunction( name: String, + displayName: String, aggregateFunction: AggregateFunction[_, _], val returnType: TypeInformation[_], val accType: TypeInformation[_], @@ -62,19 +64,29 @@ class AggSqlFunction( def getFunction: AggregateFunction[_, _] = aggregateFunction override def isDeterministic: Boolean = aggregateFunction.isDeterministic + + override def toString: String = displayName } object AggSqlFunction { def apply( name: String, + displayName: String, aggregateFunction: AggregateFunction[_, _], returnType: TypeInformation[_], accType: TypeInformation[_], typeFactory: FlinkTypeFactory, requiresOver: Boolean): AggSqlFunction = { - new AggSqlFunction(name, aggregateFunction, returnType, accType, typeFactory, requiresOver) + new AggSqlFunction( + name, + displayName, + aggregateFunction, + returnType, + accType, + typeFactory, + requiresOver) } private[flink] def createOperandTypeInference( http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala index 784bca7..27e093d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala @@ -35,11 +35,13 @@ import scala.collection.JavaConverters._ * Calcite wrapper for user-defined scalar functions. * * @param name function name (used by SQL parser) + * @param displayName name to be displayed in operator name * @param scalarFunction scalar function to be called * @param typeFactory type factory for converting Flink's between Calcite's types */ class ScalarSqlFunction( name: String, + displayName: String, scalarFunction: ScalarFunction, typeFactory: FlinkTypeFactory) extends SqlFunction( @@ -53,6 +55,8 @@ class ScalarSqlFunction( def getScalarFunction = scalarFunction override def isDeterministic: Boolean = scalarFunction.isDeterministic + + override def toString: String = displayName } object ScalarSqlFunction { http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala index 6d9742c..741d15b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala @@ -37,6 +37,7 @@ import org.apache.flink.table.functions.utils.TableSqlFunction._ */ class TableSqlFunction( name: String, + displayName: String, tableFunction: TableFunction[_], rowTypeInfo: TypeInformation[_], typeFactory: FlinkTypeFactory, @@ -66,6 +67,8 @@ class TableSqlFunction( def getPojoFieldMapping: Array[Int] = functionImpl.fieldIndexes override def isDeterministic: Boolean = tableFunction.isDeterministic + + override def toString: String = displayName } object TableSqlFunction { http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index 6a90569..3cd694a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -251,10 +251,11 @@ object UserDefinedFunctionUtils { */ def createScalarSqlFunction( name: String, + displayName: String, function: ScalarFunction, typeFactory: FlinkTypeFactory) : SqlFunction = { - new ScalarSqlFunction(name, function, typeFactory) + new ScalarSqlFunction(name, displayName, function, typeFactory) } /** @@ -268,13 +269,14 @@ object UserDefinedFunctionUtils { */ def createTableSqlFunction( name: String, + displayName: String, tableFunction: TableFunction[_], resultType: TypeInformation[_], typeFactory: FlinkTypeFactory) : SqlFunction = { val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType) val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames) - new TableSqlFunction(name, tableFunction, resultType, typeFactory, function) + new TableSqlFunction(name, displayName, tableFunction, resultType, typeFactory, function) } /** @@ -287,6 +289,7 @@ object UserDefinedFunctionUtils { */ def createAggregateSqlFunction( name: String, + displayName: String, aggFunction: AggregateFunction[_, _], resultType: TypeInformation[_], accTypeInfo: TypeInformation[_], @@ -297,6 +300,7 @@ object UserDefinedFunctionUtils { AggSqlFunction( name, + displayName, aggFunction, resultType, accTypeInfo, http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index 559d20d..0c8efd7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -728,6 +728,7 @@ case class LogicalTableFunctionCall( val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] val sqlFunction = new TableSqlFunction( tableFunction.functionIdentifier, + tableFunction.toString, tableFunction, resultType, typeFactory, http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala index 7c01fde..c53f090 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -179,21 +179,29 @@ trait CommonCorrelate { } private[flink] def selectToString(rowType: RelDataType): String = { - rowType.getFieldNames.asScala.mkString(",") + rowType.getFieldNames.asScala.mkString(", ") } private[flink] def correlateOpName( + inputType: RelDataType, rexCall: RexCall, sqlFunction: TableSqlFunction, - rowType: RelDataType) + rowType: RelDataType, + expression: (RexNode, List[String], Option[List[RexNode]]) => String) : String = { - s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}" + s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, expression)}," + + s" select: ${selectToString(rowType)}" } - private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = { - val udtfName = sqlFunction.getName - val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",") + private[flink] def correlateToString( + inputType: RelDataType, + rexCall: RexCall, + sqlFunction: TableSqlFunction, + expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { + val inFields = inputType.getFieldNames.asScala.toList + val udtfName = sqlFunction.toString + val operands = rexCall.getOperands.asScala.map(expression(_, inFields, None)).mkString(", ") s"table($udtfName($operands))" } http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala index 731d2e5..5f94562 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala @@ -76,7 +76,7 @@ class DataSetCorrelate( override def toString: String = { val rexCall = scan.getCall.asInstanceOf[RexCall] val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - correlateToString(rexCall, sqlFunction) + correlateToString(joinRowType, rexCall, sqlFunction, getExpressionString) } override def explainTerms(pw: RelWriter): RelWriter = { @@ -84,7 +84,11 @@ class DataSetCorrelate( val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] super.explainTerms(pw) .item("invocation", scan.getCall) - .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName) + .item("correlate", correlateToString( + inputNode.getRowType, + rexCall, sqlFunction, + getExpressionString)) + .item("select", selectToString(relRowType)) .item("rowType", relRowType) .item("joinType", joinType) .itemIf("condition", condition.orNull, condition.isDefined) @@ -103,8 +107,6 @@ class DataSetCorrelate( val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping) val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]] - val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] - val flatMap = generateFunction( config, new RowSchema(getInput.getRowType), @@ -131,6 +133,14 @@ class DataSetCorrelate( collector.code, flatMap.returnType) - inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType)) + inputDS + .flatMap(mapFunc) + .name(correlateOpName( + inputNode.getRowType, + rexCall, + sqlFunction, + relRowType, + getExpressionString) + ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala index 18ab2a3..4c702ee 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala @@ -69,7 +69,7 @@ class DataStreamCorrelate( override def toString: String = { val rexCall = scan.getCall.asInstanceOf[RexCall] val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - correlateToString(rexCall, sqlFunction) + correlateToString(inputSchema.relDataType, rexCall, sqlFunction, getExpressionString) } override def explainTerms(pw: RelWriter): RelWriter = { @@ -77,7 +77,11 @@ class DataStreamCorrelate( val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] super.explainTerms(pw) .item("invocation", scan.getCall) - .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName) + .item("correlate", correlateToString( + inputSchema.relDataType, + rexCall, sqlFunction, + getExpressionString)) + .item("select", selectToString(schema.relDataType)) .item("rowType", schema.relDataType) .item("joinType", joinType) .itemIf("condition", condition.orNull, condition.isDefined) @@ -130,7 +134,13 @@ class DataStreamCorrelate( .process(processFunc) // preserve input parallelism to ensure that acc and retract messages remain in order .setParallelism(inputParallelism) - .name(correlateOpName(rexCall, sqlFunction, schema.relDataType)) + .name(correlateOpName( + inputSchema.relDataType, + rexCall, + sqlFunction, + schema.relDataType, + getExpressionString) + ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala index 802fd85..23dfc03 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala @@ -84,6 +84,7 @@ class LogicalUnnestRule( // create table function val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunction( "explode", + "explode", ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo), FlinkTypeFactory.toTypeInfo(arrayType.getComponentType), cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]) http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java index 14f812a..61f43dc 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java @@ -110,6 +110,11 @@ public class JavaUserDefinedAggFunctions { acc.sum += a.sum; } } + + @Override + public String toString() { + return "myWeightedAvg"; + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala index 59d2a47..486b078 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala @@ -238,7 +238,7 @@ class TableSourceTest extends TableTestBase { Array("name", "id", "amount", "price"), "'amount > 2"), term("select", "price", "id", "amount"), - term("where", s"<(${func.functionIdentifier}(amount), 32)") + term("where", s"<(${Func0.getClass.getSimpleName}(amount), 32)") ) util.verifyTable(result, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala index a71f11c..6942a4e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala @@ -42,7 +42,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func1($cor0.c)"), - term("function", func1.getClass.getCanonicalName), + term("correlate", s"table(func1($$cor0.c))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "INNER") @@ -62,7 +63,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func1($cor0.c, '$')"), - term("function", func1.getClass.getCanonicalName), + term("correlate", s"table(func1($$cor0.c, '$$'))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "INNER") @@ -88,7 +90,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func1($cor0.c)"), - term("function", func1.getClass.getCanonicalName), + term("correlate", s"table(func1($$cor0.c))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "LEFT") @@ -114,7 +117,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func2($cor0.c)"), - term("function", func2.getClass.getCanonicalName), + term("correlate", s"table(func2($$cor0.c))"), + term("select", "a", "b", "c", "f0", "f1"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " + "VARCHAR(65536) f0, INTEGER f1)"), @@ -141,7 +145,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "hierarchy($cor0.c)"), - term("function", function.getClass.getCanonicalName), + term("correlate", s"table(hierarchy($$cor0.c))"), + term("select", "a", "b", "c", "f0", "f1", "f2"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," + " VARCHAR(65536) f0, BOOLEAN f1, INTEGER f2)"), @@ -168,7 +173,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "pojo($cor0.c)"), - term("function", function.getClass.getCanonicalName), + term("correlate", s"table(pojo($$cor0.c))"), + term("select", "a", "b", "c", "age", "name"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," + " INTEGER age, VARCHAR(65536) name)"), @@ -196,7 +202,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func2($cor0.c)"), - term("function", func2.getClass.getCanonicalName), + term("correlate", s"table(func2($$cor0.c))"), + term("select", "a", "b", "c", "f0", "f1"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " + "VARCHAR(65536) f0, INTEGER f1)"), @@ -224,7 +231,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func1(SUBSTRING($cor0.c, 2))"), - term("function", func1.getClass.getCanonicalName), + term("correlate", s"table(func1(SUBSTRING($$cor0.c, 2)))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "INNER") @@ -250,7 +258,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func1('hello', 'world', $cor0.c)"), - term("function", func1.getClass.getCanonicalName), + term("correlate", s"table(func1('hello', 'world', $$cor0.c))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "INNER") @@ -272,7 +281,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", "func2('hello', 'world', $cor0.c)"), - term("function", func2.getClass.getCanonicalName), + term("correlate", s"table(func2('hello', 'world', $$cor0.c))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "INNER") http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala index ee05547..ff6dcf1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala @@ -88,10 +88,10 @@ class CalcTest extends TableTestBase { "DataSetCalc", batchTableNode(0), term("select", - s"${giveMeCaseClass.functionIdentifier}().my AS _c0", - s"${giveMeCaseClass.functionIdentifier}().clazz AS _c1", - s"${giveMeCaseClass.functionIdentifier}().my AS _c2", - s"${giveMeCaseClass.functionIdentifier}().clazz AS _c3" + "giveMeCaseClass$().my AS _c0", + "giveMeCaseClass$().clazz AS _c1", + "giveMeCaseClass$().my AS _c2", + "giveMeCaseClass$().clazz AS _c3" ) ) @@ -171,7 +171,7 @@ class CalcTest extends TableTestBase { val expected = unaryNode( "DataSetCalc", batchTableNode(0), - term("select", s"${MyHashCode.functionIdentifier}(c) AS _c0", "b") + term("select", "MyHashCode$(c) AS _c0", "b") ) util.verifyTable(resultTable, expected) @@ -283,7 +283,7 @@ class CalcTest extends TableTestBase { unaryNode( "DataSetCalc", batchTableNode(0), - term("select", "a", "c", s"${MyHashCode.functionIdentifier}(c) AS k") + term("select", "a", "c", "MyHashCode$(c) AS k") ), term("groupBy", "k"), term("select", "k", "SUM(a) AS TMP_0") http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala index 15f3def..0b48070 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala @@ -21,7 +21,6 @@ package org.apache.flink.table.api.batch.table import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestUtil._ -import org.apache.flink.table.runtime.utils._ import org.apache.flink.table.utils.{TableFunc1, TableTestBase} import org.junit.Test @@ -41,7 +40,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", s"${function.functionIdentifier}($$2)"), - term("function", function), + term("correlate", s"table(${function.getClass.getSimpleName}(c))"), + term("select", "a", "b", "c", "s"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "INNER") @@ -61,7 +61,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", s"${function.functionIdentifier}($$2, '$$')"), - term("function", function), + term("correlate", s"table(${function.getClass.getSimpleName}(c, '$$'))"), + term("select", "a", "b", "c", "s"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "INNER") @@ -86,7 +87,8 @@ class CorrelateTest extends TableTestBase { "DataSetCorrelate", batchTableNode(0), term("invocation", s"${function.functionIdentifier}($$2)"), - term("function", function), + term("correlate", s"table(${function.getClass.getSimpleName}(c))"), + term("select", "a", "b", "c", "s"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "LEFT") http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala index e441203..6a2f1a7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala @@ -71,7 +71,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("groupBy", "string"), term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)), - term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") + term("select", "string", "myWeightedAvg(long, int) AS TMP_0") ) util.verifyTable(windowedTable, expected) @@ -212,7 +212,7 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term("window", SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)), - term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") + term("select", "string", "myWeightedAvg(long, int) AS TMP_0") ) util.verifyTable(windowedTable, expected) @@ -310,7 +310,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("groupBy", "string"), term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)), - term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") + term("select", "string", "myWeightedAvg(long, int) AS TMP_0") ) util.verifyTable(windowedTable, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala index 955ed4b..ec61816 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala @@ -42,7 +42,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", "func1($cor0.c)"), - term("function", func1.getClass.getCanonicalName), + term("correlate", s"table(func1($$cor0.c))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "INNER") @@ -62,7 +63,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", "func1($cor0.c, '$')"), - term("function", func1.getClass.getCanonicalName), + term("correlate", s"table(func1($$cor0.c, '$$'))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "INNER") @@ -88,7 +90,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", "func1($cor0.c)"), - term("function", func1.getClass.getCanonicalName), + term("correlate", s"table(func1($$cor0.c))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "LEFT") @@ -114,7 +117,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", "func2($cor0.c)"), - term("function", func2.getClass.getCanonicalName), + term("correlate", s"table(func2($$cor0.c))"), + term("select", "a", "b", "c", "f0", "f1"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " + "VARCHAR(65536) f0, INTEGER f1)"), @@ -141,7 +145,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", "hierarchy($cor0.c)"), - term("function", function.getClass.getCanonicalName), + term("correlate", s"table(hierarchy($$cor0.c))"), + term("select", "a", "b", "c", "f0", "f1", "f2"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," + " VARCHAR(65536) f0, BOOLEAN f1, INTEGER f2)"), @@ -168,7 +173,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", "pojo($cor0.c)"), - term("function", function.getClass.getCanonicalName), + term("correlate", s"table(pojo($$cor0.c))"), + term("select", "a", "b", "c", "age", "name"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," + " INTEGER age, VARCHAR(65536) name)"), @@ -196,7 +202,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", "func2($cor0.c)"), - term("function", func2.getClass.getCanonicalName), + term("correlate", s"table(func2($$cor0.c))"), + term("select", "a", "b", "c", "f0", "f1"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " + "VARCHAR(65536) f0, INTEGER f1)"), @@ -224,7 +231,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", "func1(SUBSTRING($cor0.c, 2))"), - term("function", func1.getClass.getCanonicalName), + term("correlate", s"table(func1(SUBSTRING($$cor0.c, 2)))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "INNER") @@ -250,7 +258,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", "func1('hello', 'world', $cor0.c)"), - term("function", func1.getClass.getCanonicalName), + term("correlate", s"table(func1('hello', 'world', $$cor0.c))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "INNER") @@ -272,7 +281,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", "func2('hello', 'world', $cor0.c)"), - term("function", func2.getClass.getCanonicalName), + term("correlate", s"table(func2('hello', 'world', $$cor0.c))"), + term("select", "a", "b", "c", "f0"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"), term("joinType", "INNER") http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala index f15dea9..9d9d1db 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala @@ -19,8 +19,8 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.Func13 import org.apache.flink.table.utils.TableTestUtil._ -import org.apache.flink.table.runtime.utils._ import org.apache.flink.table.utils._ import org.junit.Test @@ -40,7 +40,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", s"${function.functionIdentifier}($$2)"), - term("function", function), + term("correlate", s"table(${function.getClass.getSimpleName}(c))"), + term("select", "a", "b", "c", "s"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "INNER") @@ -60,7 +61,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", s"${function.functionIdentifier}($$2, '$$')"), - term("function", function), + term("correlate", s"table(${function.getClass.getSimpleName}(c, '$$'))"), + term("select", "a", "b", "c", "s"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "INNER") @@ -85,7 +87,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", s"${function.functionIdentifier}($$2)"), - term("function", function), + term("correlate", s"table(${function.getClass.getSimpleName}(c))"), + term("select", "a", "b", "c", "s"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "LEFT") @@ -101,16 +104,19 @@ class CorrelateTest extends TableTestBase { val util = streamTestUtil() val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = util.addFunction("func2", new TableFunc2) + val scalarFunc = new Func13("pre") - val result = table.join(function('c) as ('name, 'len)).select('c, 'name, 'len) + val result = table.join(function(scalarFunc('c)) as ('name, 'len)).select('c, 'name, 'len) val expected = unaryNode( "DataStreamCalc", unaryNode( "DataStreamCorrelate", streamTableNode(0), - term("invocation", s"${function.functionIdentifier}($$2)"), - term("function", function), + term("invocation", + s"${function.functionIdentifier}(${scalarFunc.functionIdentifier}($$2))"), + term("correlate", s"table(${function.getClass.getSimpleName}(Func13(c)))"), + term("select", "a", "b", "c", "name", "len"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " + "VARCHAR(65536) name, INTEGER len)"), @@ -134,7 +140,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", s"${function.functionIdentifier}($$2)"), - term("function", function), + term("correlate", "table(HierarchyTableFunction(c))"), + term("select", "a", "b", "c", "name", "adult", "len"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," + " VARCHAR(65536) name, BOOLEAN adult, INTEGER len)"), @@ -156,7 +163,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", s"${function.functionIdentifier}($$2)"), - term("function", function), + term("correlate", s"table(${function.getClass.getSimpleName}(c))"), + term("select", "a", "b", "c", "age", "name"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " + "INTEGER age, VARCHAR(65536) name)"), @@ -183,7 +191,8 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", s"${function.functionIdentifier}($$2)"), - term("function", function), + term("correlate", s"table(${function.getClass.getSimpleName}(c))"), + term("select", "a", "b", "c", "name", "len"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " + "VARCHAR(65536) name, INTEGER len)"), @@ -208,7 +217,9 @@ class CorrelateTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", s"${function.functionIdentifier}(SUBSTRING($$2, 2, CHAR_LENGTH($$2)))"), - term("function", function), + term("correlate", + s"table(${function.getClass.getSimpleName}(SUBSTRING(c, 2, CHAR_LENGTH(c))))"), + term("select", "a", "b", "c", "s"), term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "INNER") http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala index 599c76b..260726b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala @@ -181,7 +181,7 @@ class GroupWindowTest extends TableTestBase { WindowReference("w"), 'rowtime, 5.milli)), - term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") + term("select", "string", "myWeightedAvg(long, int) AS TMP_0") ) util.verifyTable(windowedTable, expected) @@ -319,7 +319,7 @@ class GroupWindowTest extends TableTestBase { streamTableNode(0), term("groupBy", "string"), term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)), - term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") + term("select", "string", "myWeightedAvg(long, int) AS TMP_0") ) util.verifyTable(windowedTable, expected) @@ -363,7 +363,7 @@ class GroupWindowTest extends TableTestBase { streamTableNode(0), term("groupBy", "string"), term("window", SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)), - term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") + term("select", "string", "myWeightedAvg(long, int) AS TMP_0") ) util.verifyTable(windowedTable, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala index 8b563a3..55e3ecb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala @@ -23,7 +23,6 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils.Func1 import org.apache.flink.table.api.Table import org.apache.flink.table.utils.TableTestUtil._ -import org.apache.flink.table.utils.StreamTableTestUtil import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} import org.junit.Test @@ -65,7 +64,7 @@ class OverWindowTest extends TableTestBase { "WeightedAvgWithRetract(c, a) AS w0$o2") ), term("select", - s"${plusOne.functionIdentifier}(w0$$o0) AS d", + s"Func1$$(w0$$o0) AS d", "EXP(CAST(w0$o1)) AS _c1", "+(w0$o2, 1) AS _c2", "||('AVG:', CAST(w0$o2)) AS _c3", http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala index b4ad9ca..ce4de14 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala @@ -491,7 +491,7 @@ class ExpressionReductionRulesTest extends TableTestBase { "DataStreamCalc", streamTableNode(0), term("select", "a", "b", "c"), - term("where", s"IS NULL(${NonDeterministicNullFunc.functionIdentifier}())") + term("where", s"IS NULL(NonDeterministicNullFunc$$())") ) util.verifyTable(result, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala index cfff326..1714ec8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala @@ -160,7 +160,8 @@ class TimeIndicatorConversionTest extends TableTestBase { streamTableNode(0), term("invocation", s"${func.functionIdentifier}(CAST($$0):TIMESTAMP(3) NOT NULL, PROCTIME($$3), '')"), - term("function", func), + term("correlate", s"table(TableFunc(CAST(rowtime), PROCTIME(proctime), ''))"), + term("select", "rowtime", "long", "int", "proctime", "s"), term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, " + "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(65536) s)"), term("joinType", "INNER") http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala index c62349c..480d817 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala @@ -284,7 +284,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val func1 = new Func13("Sunny") val func2 = new Func13("kevin2") - val result = t.select(func0('c), func1('c),func2('c)) + val result = t.select(func0('c), func1('c), func2('c)) result.addSink(new StreamITCase.StringSink[Row]) env.execute()
