This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ca414aeb4e7495bbfa2751d241aafc85bc3fa9b6 Author: beyond1920 <[email protected]> AuthorDate: Thu Aug 1 16:55:23 2019 +0800 [FLINK-13529][table-planner-blink] Rename CONCAT_AGG to LISTAGG and fix the behavior according to the ANSI-SQL According to the ANSI-SQL, the LISTAGG function is used to transform values from a group of rows into a list of values that are delimited by a configurable separator. This closes #9316 --- ...ConcatAggFunction.java => ListAggFunction.java} | 14 +++--- ...ion.java => ListAggWithRetractAggFunction.java} | 30 ++++++------ ...n.java => ListAggWsWithRetractAggFunction.java} | 30 ++++++------ .../functions/sql/FlinkSqlOperatorTable.java | 4 +- ...catAggFunction.java => SqlListAggFunction.java} | 27 +++++++---- .../plan/rules/logical/SplitAggregateRule.scala | 4 +- .../planner/plan/utils/AggFunctionFactory.scala | 22 ++++----- .../table/planner/plan/utils/AggregateUtil.scala | 4 +- ...java => ListAggWithRetractAggFunctionTest.java} | 16 +++---- ...va => ListAggWsWithRetractAggFunctionTest.java} | 36 +++++++------- .../apache/flink/table/api/stream/ExplainTest.xml | 48 +++++++++---------- .../planner/plan/batch/sql/RemoveCollationTest.xml | 16 +++---- .../plan/rules/logical/SplitAggregateRuleTest.xml | 8 ++-- .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 40 ++++++++-------- .../plan/stream/sql/agg/DistinctAggregateTest.xml | 34 ++++++------- .../stream/sql/agg/IncrementalAggregateTest.xml | 10 ++-- .../flink/table/api/stream/ExplainTest.scala | 4 +- .../plan/batch/sql/RemoveCollationTest.scala | 4 +- .../rules/logical/SplitAggregateRuleTest.scala | 2 +- .../stream/sql/MiniBatchIntervalInferTest.scala | 6 +-- .../stream/sql/agg/DistinctAggregateTest.scala | 2 +- .../table/validation/AggregateValidationTest.scala | 55 ++++++++++++++++++---- .../runtime/batch/sql/agg/SortAggITCase.scala | 11 ++--- .../runtime/stream/sql/AggregateITCase.scala | 20 ++++---- .../runtime/stream/table/AggregateITCase.scala | 2 +- 25 files changed, 245 insertions(+), 204 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java similarity index 91% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java index 2e8f5fb..377b251 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java @@ -32,23 +32,23 @@ import static org.apache.flink.table.planner.expressions.ExpressionBuilder.liter import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; /** - * built-in concat aggregate function. + * built-in listagg aggregate function. */ -public class ConcatAggFunction extends DeclarativeAggregateFunction { +public class ListAggFunction extends DeclarativeAggregateFunction { private int operandCount; private UnresolvedReferenceExpression acc = unresolvedRef("concatAcc"); private UnresolvedReferenceExpression accDelimiter = unresolvedRef("accDelimiter"); private Expression delimiter; private Expression operand; - public ConcatAggFunction(int operandCount) { + public ListAggFunction(int operandCount) { this.operandCount = operandCount; if (operandCount == 1) { - delimiter = literal("\n", DataTypes.STRING()); + delimiter = literal(",", DataTypes.STRING()); operand = operand(0); } else { - delimiter = operand(0); - operand = operand(1); + delimiter = operand(1); + operand = operand(0); } } @@ -75,7 +75,7 @@ public class ConcatAggFunction extends DeclarativeAggregateFunction { @Override public Expression[] initialValuesExpressions() { return new Expression[] { - /* delimiter */ literal("\n", DataTypes.STRING()), + /* delimiter */ literal(",", DataTypes.STRING()), /* acc */ nullOf(DataTypes.STRING()) }; } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java similarity index 77% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java index ea5857a..8840844 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java @@ -31,18 +31,18 @@ import java.util.List; import java.util.Objects; /** - * built-in concat with retraction aggregate function. + * built-in listagg with retraction aggregate function. */ -public final class ConcatWithRetractAggFunction - extends AggregateFunction<BinaryString, ConcatWithRetractAggFunction.ConcatWithRetractAccumulator> { +public final class ListAggWithRetractAggFunction + extends AggregateFunction<BinaryString, ListAggWithRetractAggFunction.ListAggWithRetractAccumulator> { private static final long serialVersionUID = -2836795091288790955L; - private static final BinaryString lineDelimiter = BinaryString.fromString("\n"); + private static final BinaryString lineDelimiter = BinaryString.fromString(","); /** - * The initial accumulator for concat with retraction aggregate function. + * The initial accumulator for listagg with retraction aggregate function. */ - public static class ConcatWithRetractAccumulator { + public static class ListAggWithRetractAccumulator { public ListView<BinaryString> list = new ListView<>(BinaryStringTypeInfo.INSTANCE); public ListView<BinaryString> retractList = new ListView<>(BinaryStringTypeInfo.INSTANCE); @@ -55,25 +55,25 @@ public final class ConcatWithRetractAggFunction if (o == null || getClass() != o.getClass()) { return false; } - ConcatWithRetractAccumulator that = (ConcatWithRetractAccumulator) o; + ListAggWithRetractAccumulator that = (ListAggWithRetractAccumulator) o; return Objects.equals(list, that.list) && Objects.equals(retractList, that.retractList); } } @Override - public ConcatWithRetractAccumulator createAccumulator() { - return new ConcatWithRetractAccumulator(); + public ListAggWithRetractAccumulator createAccumulator() { + return new ListAggWithRetractAccumulator(); } - public void accumulate(ConcatWithRetractAccumulator acc, BinaryString value) throws Exception { + public void accumulate(ListAggWithRetractAccumulator acc, BinaryString value) throws Exception { // ignore null value if (value != null) { acc.list.add(value); } } - public void retract(ConcatWithRetractAccumulator acc, BinaryString value) throws Exception { + public void retract(ListAggWithRetractAccumulator acc, BinaryString value) throws Exception { if (value != null) { if (!acc.list.remove(value)) { acc.retractList.add(value); @@ -81,8 +81,8 @@ public final class ConcatWithRetractAggFunction } } - public void merge(ConcatWithRetractAccumulator acc, Iterable<ConcatWithRetractAccumulator> its) throws Exception { - for (ConcatWithRetractAccumulator otherAcc : its) { + public void merge(ListAggWithRetractAccumulator acc, Iterable<ListAggWithRetractAccumulator> its) throws Exception { + for (ListAggWithRetractAccumulator otherAcc : its) { // merge list of acc and other List<BinaryString> buffer = new ArrayList<>(); for (BinaryString binaryString : acc.list.get()) { @@ -117,7 +117,7 @@ public final class ConcatWithRetractAggFunction } @Override - public BinaryString getValue(ConcatWithRetractAccumulator acc) { + public BinaryString getValue(ListAggWithRetractAccumulator acc) { try { Iterable<BinaryString> accList = acc.list.get(); if (accList == null || !accList.iterator().hasNext()) { @@ -131,7 +131,7 @@ public final class ConcatWithRetractAggFunction } } - public void resetAccumulator(ConcatWithRetractAccumulator acc) { + public void resetAccumulator(ListAggWithRetractAccumulator acc) { acc.list.clear(); acc.retractList.clear(); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java similarity index 77% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java index a968cb4..317f97e 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java @@ -31,20 +31,20 @@ import java.util.List; import java.util.Objects; /** - * built-in concatWs with retraction aggregate function. + * built-in listAggWs with retraction aggregate function. */ -public final class ConcatWsWithRetractAggFunction - extends AggregateFunction<BinaryString, ConcatWsWithRetractAggFunction.ConcatWsWithRetractAccumulator> { +public final class ListAggWsWithRetractAggFunction + extends AggregateFunction<BinaryString, ListAggWsWithRetractAggFunction.ListAggWsWithRetractAccumulator> { private static final long serialVersionUID = -8627988150350160473L; /** * The initial accumulator for concat with retraction aggregate function. */ - public static class ConcatWsWithRetractAccumulator { + public static class ListAggWsWithRetractAccumulator { public ListView<BinaryString> list = new ListView<>(BinaryStringTypeInfo.INSTANCE); public ListView<BinaryString> retractList = new ListView<>(BinaryStringTypeInfo.INSTANCE); - public BinaryString delimiter = BinaryString.fromString("\n"); + public BinaryString delimiter = BinaryString.fromString(","); @VisibleForTesting @Override @@ -55,7 +55,7 @@ public final class ConcatWsWithRetractAggFunction if (o == null || getClass() != o.getClass()) { return false; } - ConcatWsWithRetractAccumulator that = (ConcatWsWithRetractAccumulator) o; + ListAggWsWithRetractAccumulator that = (ListAggWsWithRetractAccumulator) o; return Objects.equals(list, that.list) && Objects.equals(retractList, that.retractList) && Objects.equals(delimiter, that.delimiter); @@ -63,11 +63,11 @@ public final class ConcatWsWithRetractAggFunction } @Override - public ConcatWsWithRetractAccumulator createAccumulator() { - return new ConcatWsWithRetractAccumulator(); + public ListAggWsWithRetractAccumulator createAccumulator() { + return new ListAggWsWithRetractAccumulator(); } - public void accumulate(ConcatWsWithRetractAccumulator acc, BinaryString lineDelimiter, BinaryString value) throws Exception { + public void accumulate(ListAggWsWithRetractAccumulator acc, BinaryString value, BinaryString lineDelimiter) throws Exception { // ignore null value if (value != null) { acc.delimiter = lineDelimiter; @@ -75,7 +75,7 @@ public final class ConcatWsWithRetractAggFunction } } - public void retract(ConcatWsWithRetractAccumulator acc, BinaryString lineDelimiter, BinaryString value) throws Exception { + public void retract(ListAggWsWithRetractAccumulator acc, BinaryString value, BinaryString lineDelimiter) throws Exception { if (value != null) { acc.delimiter = lineDelimiter; if (!acc.list.remove(value)) { @@ -84,8 +84,8 @@ public final class ConcatWsWithRetractAggFunction } } - public void merge(ConcatWsWithRetractAccumulator acc, Iterable<ConcatWsWithRetractAccumulator> its) throws Exception { - for (ConcatWsWithRetractAccumulator otherAcc : its) { + public void merge(ListAggWsWithRetractAccumulator acc, Iterable<ListAggWsWithRetractAccumulator> its) throws Exception { + for (ListAggWsWithRetractAccumulator otherAcc : its) { if (!otherAcc.list.get().iterator().hasNext() && !otherAcc.retractList.get().iterator().hasNext()) { // otherAcc is empty, skip it @@ -127,7 +127,7 @@ public final class ConcatWsWithRetractAggFunction } @Override - public BinaryString getValue(ConcatWsWithRetractAccumulator acc) { + public BinaryString getValue(ListAggWsWithRetractAccumulator acc) { try { Iterable<BinaryString> accList = acc.list.get(); if (accList == null || !accList.iterator().hasNext()) { @@ -141,8 +141,8 @@ public final class ConcatWsWithRetractAggFunction } } - public void resetAccumulator(ConcatWsWithRetractAccumulator acc) { - acc.delimiter = BinaryString.fromString("\n"); + public void resetAccumulator(ListAggWsWithRetractAccumulator acc) { + acc.delimiter = BinaryString.fromString(","); acc.list.clear(); acc.retractList.clear(); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 50ff193..dbabb69 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -946,9 +946,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFirstLastValueAggFunction LAST_VALUE = new SqlFirstLastValueAggFunction(SqlKind.LAST_VALUE); /** - * <code>CONCAT_AGG</code> aggregate function. + * <code>LISTAGG</code> aggregate function. */ - public static final SqlConcatAggFunction CONCAT_AGG = new SqlConcatAggFunction(); + public static final SqlListAggFunction LISTAGG = new SqlListAggFunction(); /** * <code>INCR_SUM</code> aggregate function. diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java similarity index 71% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java index 7e215e0..9557f0c 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java @@ -25,30 +25,37 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.ReturnTypes; -import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.type.SqlTypeTransforms; import java.util.List; /** - * <code>CONCAT_AGG</code> aggregate function returns the concatenation of + * <code>LISTAGG</code> aggregate function returns the concatenation of * a list of values that are input to the function. + * + * <p>NOTE: The difference between this and {@link SqlStdOperatorTable#LISTAGG} is that: + * (1). constraint the second parameter must to be a character literal. + * (2). not require over clause to use this aggregate function. */ -public class SqlConcatAggFunction extends SqlAggFunction { +public class SqlListAggFunction extends SqlAggFunction { - public SqlConcatAggFunction() { - super("CONCAT_AGG", + public SqlListAggFunction() { + super("LISTAGG", null, - SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE), + SqlKind.LISTAGG, + ReturnTypes.ARG0_NULLABLE, null, OperandTypes.or( OperandTypes.CHARACTER, - OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)), - SqlFunctionCategory.STRING, + OperandTypes.sequence( + "'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)'", + OperandTypes.CHARACTER, + OperandTypes.and(OperandTypes.CHARACTER, OperandTypes.LITERAL) + )), + SqlFunctionCategory.SYSTEM, false, false); } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala index b2bd3a5..524d5a3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala @@ -334,8 +334,8 @@ object SplitAggregateRule { (Seq(FlinkSqlOperatorTable.FIRST_VALUE), Seq(FlinkSqlOperatorTable.FIRST_VALUE)), FlinkSqlOperatorTable.LAST_VALUE -> (Seq(FlinkSqlOperatorTable.LAST_VALUE), Seq(FlinkSqlOperatorTable.LAST_VALUE)), - FlinkSqlOperatorTable.CONCAT_AGG -> - (Seq(FlinkSqlOperatorTable.CONCAT_AGG), Seq(FlinkSqlOperatorTable.CONCAT_AGG)), + FlinkSqlOperatorTable.LISTAGG -> + (Seq(FlinkSqlOperatorTable.LISTAGG), Seq(FlinkSqlOperatorTable.LISTAGG)), SINGLE_VALUE -> (Seq(SINGLE_VALUE), Seq(SINGLE_VALUE)) ) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index 4dd3439..a8e00d3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -31,7 +31,7 @@ import org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFu import org.apache.flink.table.planner.functions.aggfunctions.SingleValueAggFunction._ import org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFunction._ import org.apache.flink.table.planner.functions.aggfunctions._ -import org.apache.flink.table.planner.functions.sql.{SqlConcatAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction} +import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction} import org.apache.flink.table.planner.functions.utils.AggSqlFunction import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo @@ -113,11 +113,11 @@ class AggFunctionFactory( case a: SqlFirstLastValueAggFunction if a.getKind == SqlKind.LAST_VALUE => createLastValueAggFunction(argTypes, index) - case _: SqlConcatAggFunction if call.getArgList.size() == 1 => - createConcatAggFunction(argTypes, index) + case _: SqlListAggFunction if call.getArgList.size() == 1 => + createListAggFunction(argTypes, index) - case _: SqlConcatAggFunction if call.getArgList.size() == 2 => - createConcatWsAggFunction(argTypes, index) + case _: SqlListAggFunction if call.getArgList.size() == 2 => + createListAggWsFunction(argTypes, index) // TODO supports SqlCardinalityCountAggFunction @@ -606,23 +606,23 @@ class AggFunctionFactory( } } - private def createConcatAggFunction( + private def createListAggFunction( argTypes: Array[LogicalType], index: Int): UserDefinedFunction = { if (needRetraction(index)) { - new ConcatWithRetractAggFunction + new ListAggWithRetractAggFunction } else { - new ConcatAggFunction(1) + new ListAggFunction(1) } } - private def createConcatWsAggFunction( + private def createListAggWsFunction( argTypes: Array[LogicalType], index: Int): UserDefinedFunction = { if (needRetraction(index)) { - new ConcatWsWithRetractAggFunction + new ListAggWsWithRetractAggFunction } else { - new ConcatAggFunction(2) + new ListAggFunction(2) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala index 544555a..8877be5 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala @@ -32,7 +32,7 @@ import org.apache.flink.table.planner.dataview.DataViewUtils.useNullSerializerFo import org.apache.flink.table.planner.dataview.{DataViewSpec, MapViewSpec} import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart, RexNodeConverter} import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction -import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlConcatAggFunction, SqlFirstLastValueAggFunction} +import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlListAggFunction, SqlFirstLastValueAggFunction} import org.apache.flink.table.planner.functions.utils.AggSqlFunction import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity @@ -541,7 +541,7 @@ object AggregateUtil extends Enumeration { _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction | _: SqlSingleValueAggFunction | - _: SqlConcatAggFunction => true + _: SqlListAggFunction => true case _: SqlFirstLastValueAggFunction => aggCall.getArgList.size() == 1 case _ => false } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java similarity index 79% rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java index 725d802..5c5566e 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java @@ -20,17 +20,17 @@ package org.apache.flink.table.planner.functions.aggfunctions; import org.apache.flink.table.dataformat.BinaryString; import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.planner.functions.aggfunctions.ConcatWithRetractAggFunction.ConcatWithRetractAccumulator; +import org.apache.flink.table.planner.functions.aggfunctions.ListAggWithRetractAggFunction.ListAggWithRetractAccumulator; import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; /** - * Test case for built-in concat with retraction aggregate function. + * Test case for built-in LISTAGG with retraction aggregate function. */ -public class ConcatWithRetractAggFunctionTest - extends AggFunctionTestBase<BinaryString, ConcatWithRetractAccumulator> { +public class ListAggWithRetractAggFunctionTest + extends AggFunctionTestBase<BinaryString, ListAggWithRetractAccumulator> { @Override protected List<List<BinaryString>> getInputValueSets() { @@ -53,14 +53,14 @@ public class ConcatWithRetractAggFunctionTest @Override protected List<BinaryString> getExpectedResults() { return Arrays.asList( - BinaryString.fromString("a\nb\nc\nd\ne\nf"), + BinaryString.fromString("a,b,c,d,e,f"), null, BinaryString.fromString("a")); } @Override - protected AggregateFunction<BinaryString, ConcatWithRetractAccumulator> getAggregator() { - return new ConcatWithRetractAggFunction(); + protected AggregateFunction<BinaryString, ListAggWithRetractAccumulator> getAggregator() { + return new ListAggWithRetractAggFunction(); } @Override @@ -75,6 +75,6 @@ public class ConcatWithRetractAggFunctionTest @Override protected Class<?> getAccClass() { - return ConcatWithRetractAccumulator.class; + return ListAggWithRetractAccumulator.class; } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java similarity index 78% rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java index d9e06d0..67ee8ff 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java @@ -21,7 +21,7 @@ package org.apache.flink.table.planner.functions.aggfunctions; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.dataformat.BinaryString; import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.planner.functions.aggfunctions.ConcatWsWithRetractAggFunction.ConcatWsWithRetractAccumulator; +import org.apache.flink.table.planner.functions.aggfunctions.ListAggWsWithRetractAggFunction.ListAggWsWithRetractAccumulator; import org.apache.flink.util.Preconditions; import java.lang.reflect.InvocationTargetException; @@ -34,8 +34,8 @@ import static org.junit.Assert.assertEquals; /** * Test case for built-in concatWs with retraction aggregate function. */ -public class ConcatWsWithRetractAggFunctionTest - extends AggFunctionTestBase<BinaryString, ConcatWsWithRetractAccumulator> { +public class ListAggWsWithRetractAggFunctionTest + extends AggFunctionTestBase<BinaryString, ListAggWsWithRetractAccumulator> { @Override protected List<List<BinaryString>> getInputValueSets() { @@ -84,8 +84,8 @@ public class ConcatWsWithRetractAggFunctionTest } @Override - protected AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> getAggregator() { - return new ConcatWsWithRetractAggFunction(); + protected AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> getAggregator() { + return new ListAggWsWithRetractAggFunction(); } @Override @@ -101,14 +101,14 @@ public class ConcatWsWithRetractAggFunctionTest @Override protected Class<?> getAccClass() { - return ConcatWsWithRetractAccumulator.class; + return ListAggWsWithRetractAccumulator.class; } @Override protected <E> void validateResult(E expected, E result) { - if (expected instanceof ConcatWsWithRetractAccumulator && result instanceof ConcatWsWithRetractAccumulator) { - ConcatWsWithRetractAccumulator e = (ConcatWsWithRetractAccumulator) expected; - ConcatWsWithRetractAccumulator r = (ConcatWsWithRetractAccumulator) result; + if (expected instanceof ListAggWsWithRetractAccumulator && result instanceof ListAggWsWithRetractAccumulator) { + ListAggWsWithRetractAccumulator e = (ListAggWsWithRetractAccumulator) expected; + ListAggWsWithRetractAccumulator r = (ListAggWsWithRetractAccumulator) result; assertEquals(e.list, r.list); assertEquals(e.list, r.list); } else { @@ -117,31 +117,31 @@ public class ConcatWsWithRetractAggFunctionTest } @Override - protected ConcatWsWithRetractAccumulator accumulateValues(List<BinaryString> values) + protected ListAggWsWithRetractAccumulator accumulateValues(List<BinaryString> values) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> aggregator = getAggregator(); - ConcatWsWithRetractAccumulator accumulator = getAggregator().createAccumulator(); + AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> aggregator = getAggregator(); + ListAggWsWithRetractAccumulator accumulator = getAggregator().createAccumulator(); Method accumulateFunc = getAccumulateFunc(); Preconditions.checkArgument(values.size() % 2 == 0, "number of values must be an integer multiple of 2."); for (int i = 0; i < values.size(); i += 2) { - BinaryString value = values.get(i); - BinaryString delimiter = values.get(i + 1); + BinaryString value = values.get(i + 1); + BinaryString delimiter = values.get(i); accumulateFunc.invoke(aggregator, accumulator, delimiter, value); } return accumulator; } @Override - protected void retractValues(ConcatWsWithRetractAccumulator accumulator, List<BinaryString> values) + protected void retractValues(ListAggWsWithRetractAccumulator accumulator, List<BinaryString> values) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> aggregator = getAggregator(); + AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> aggregator = getAggregator(); Method retractFunc = getRetractFunc(); Preconditions.checkArgument(values.size() % 2 == 0, "number of values must be an integer multiple of 2."); for (int i = 0; i < values.size(); i += 2) { - BinaryString value = values.get(i); - BinaryString delimiter = values.get(i + 1); + BinaryString value = values.get(i + 1); + BinaryString delimiter = values.get(i); retractFunc.invoke(aggregator, accumulator, delimiter, value); } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml index fd84d11..16d3278 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml @@ -637,8 +637,8 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], updateAsRetraction=[false], ac <![CDATA[== Abstract Syntax Tree == LogicalSink(name=[appendSink1], fields=[a, b]) +- LogicalProject(id1=[$0], EXPR$1=[$2]) - +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)]) - +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2]) + +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) + +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'#']) +- LogicalProject(id1=[$0], ts=[$2], text=[$1]) +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) @@ -649,8 +649,8 @@ LogicalSink(name=[appendSink1], fields=[a, b]) LogicalSink(name=[appendSink2], fields=[a, b]) +- LogicalProject(id1=[$0], EXPR$1=[$2]) - +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)]) - +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2]) + +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) + +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*']) +- LogicalProject(id1=[$0], ts=[$2], text=[$1]) +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) @@ -670,15 +670,15 @@ Calc(select=[id1, rowtime AS ts, text], updateAsRetraction=[true], accMode=[Acc] +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} Sink(name=[appendSink1], fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} -+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} ++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} - +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} + +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} +- Reused(reference_id=[1]) Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} -+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} ++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} - +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} + +- Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} +- Reused(reference_id=[1]) == Physical Execution Plan == @@ -713,11 +713,11 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc ship_strategy : FORWARD : Operator - content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text)) + content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3)) ship_strategy : FORWARD : Operator - content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1) + content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1) ship_strategy : HASH : Operator @@ -725,11 +725,11 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc ship_strategy : FORWARD : Operator - content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text)) + content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3)) ship_strategy : FORWARD : Operator - content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1) + content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1) ship_strategy : HASH : Operator @@ -784,8 +784,8 @@ Union(all=[true], union=[a, b, c]) <![CDATA[== Abstract Syntax Tree == LogicalSink(name=[appendSink1], fields=[a, b]) +- LogicalProject(id1=[$0], EXPR$1=[$2]) - +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)]) - +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2]) + +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) + +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'#']) +- LogicalProject(id1=[$0], ts=[$2], text=[$1]) +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) @@ -796,8 +796,8 @@ LogicalSink(name=[appendSink1], fields=[a, b]) LogicalSink(name=[appendSink2], fields=[a, b]) +- LogicalProject(id1=[$0], EXPR$1=[$2]) - +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)]) - +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2]) + +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) + +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*']) +- LogicalProject(id1=[$0], ts=[$2], text=[$1]) +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) @@ -817,15 +817,15 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1]) +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime]) Sink(name=[appendSink1], fields=[a, b]) -+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1]) ++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) +- Exchange(distribution=[hash[id1]]) - +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text]) + +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3]) +- Reused(reference_id=[1]) Sink(name=[appendSink2], fields=[a, b]) -+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1]) ++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) +- Exchange(distribution=[hash[id1]]) - +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text]) + +- Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3]) +- Reused(reference_id=[1]) == Physical Execution Plan == @@ -860,11 +860,11 @@ Sink(name=[appendSink2], fields=[a, b]) ship_strategy : FORWARD : Operator - content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text)) + content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3)) ship_strategy : FORWARD : Operator - content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1) + content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1) ship_strategy : HASH : Operator @@ -872,11 +872,11 @@ Sink(name=[appendSink2], fields=[a, b]) ship_strategy : FORWARD : Operator - content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text)) + content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3)) ship_strategy : FORWARD : Operator - content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1) + content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1) ship_strategy : HASH : Operator diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml index fd8003b..76b6419 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml @@ -310,7 +310,7 @@ v2 as ( ), join_tb2 as ( - select tb1_id, concat_agg(tb2_name, ',') as tb2_names + select tb1_id, LISTAGG(tb2_name, ',') as tb2_names from ( select v1.id as tb1_id, tb2.name as tb2_name from v1 left outer join tb2 on tb2_id = tb2.id @@ -318,7 +318,7 @@ join_tb2 as ( ), join_tb3 as ( - select tb1_id, concat_agg(tb3_name, ',') as tb3_names + select tb1_id, LISTAGG(tb3_name, ',') as tb3_names from ( select v2.id as tb1_id, tb3.name as tb3_name from v2 left outer join tb3 on tb3_id = tb3.id @@ -349,7 +349,7 @@ LogicalProject(id=[$0], tb2_ids=[$2], tb3_ids=[$3], name=[$4], tb2_names=[$6], t : :- LogicalJoin(condition=[=($0, $7)], joinType=[left]) : : :- LogicalJoin(condition=[=($0, $5)], joinType=[left]) : : : :- LogicalTableScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]]) - : : : +- LogicalAggregate(group=[{0}], tb2_names=[CONCAT_AGG($1, $2)]) + : : : +- LogicalAggregate(group=[{0}], tb2_names=[LISTAGG($1, $2)]) : : : +- LogicalProject(tb1_id=[$0], tb2_name=[$3], $f2=[_UTF-16LE',']) : : : +- LogicalJoin(condition=[=($1, $2)], joinType=[left]) : : : :- LogicalProject(id=[$0], tb2_id=[$5]) @@ -357,7 +357,7 @@ LogicalProject(id=[$0], tb2_ids=[$2], tb3_ids=[$3], name=[$4], tb2_names=[$6], t : : : : :- LogicalTableScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]]) : : : : +- LogicalTableFunctionScan(invocation=[split($cor0.tb2_ids)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;]) : : : +- LogicalTableScan(table=[[default_catalog, default_database, tb2, source: [TestTableSource(id, name)]]]) - : : +- LogicalAggregate(group=[{0}], tb3_names=[CONCAT_AGG($1, $2)]) + : : +- LogicalAggregate(group=[{0}], tb3_names=[LISTAGG($1, $2)]) : : +- LogicalProject(tb1_id=[$0], tb3_name=[$3], $f2=[_UTF-16LE',']) : : +- LogicalJoin(condition=[=($1, $2)], joinType=[left]) : : :- LogicalProject(id=[$0], tb3_id=[$5]) @@ -382,10 +382,10 @@ Calc(select=[id, tb2_ids, tb3_ids, name, tb2_names, tb3_names, name0, name1]) : : : +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(id, tb1_id)], select=[id, key, tb2_ids, tb3_ids, name, tb1_id, tb2_names], rightSorted=[true]) : : : :- Exchange(distribution=[hash[id]]) : : : : +- TableSourceScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]], fields=[id, key, tb2_ids, tb3_ids, name], reuse_id=[1]) - : : : +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_CONCAT_AGG(accDelimiter$0, concatAcc$1) AS tb2_names]) + : : : +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_LISTAGG(accDelimiter$0, concatAcc$1) AS tb2_names]) : : : +- Sort(orderBy=[tb1_id ASC]) : : : +- Exchange(distribution=[hash[tb1_id]]) - : : : +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_CONCAT_AGG(tb2_name, $f2) AS (accDelimiter$0, concatAcc$1)]) + : : : +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_LISTAGG(tb2_name, $f2) AS (accDelimiter$0, concatAcc$1)]) : : : +- Sort(orderBy=[tb1_id ASC]) : : : +- Calc(select=[id AS tb1_id, name AS tb2_name, _UTF-16LE',' AS $f2]) : : : +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(tb2_id, id0)], select=[id, tb2_id, id0, name]) @@ -395,10 +395,10 @@ Calc(select=[id, tb2_ids, tb3_ids, name, tb2_names, tb3_names, name0, name1]) : : : : +- Reused(reference_id=[1]) : : : +- Exchange(distribution=[hash[id]]) : : : +- TableSourceScan(table=[[default_catalog, default_database, tb2, source: [TestTableSource(id, name)]]], fields=[id, name]) - : : +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_CONCAT_AGG(accDelimiter$0, concatAcc$1) AS tb3_names]) + : : +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_LISTAGG(accDelimiter$0, concatAcc$1) AS tb3_names]) : : +- Sort(orderBy=[tb1_id ASC]) : : +- Exchange(distribution=[hash[tb1_id]]) - : : +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_CONCAT_AGG(tb3_name, $f2) AS (accDelimiter$0, concatAcc$1)]) + : : +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_LISTAGG(tb3_name, $f2) AS (accDelimiter$0, concatAcc$1)]) : : +- Sort(orderBy=[tb1_id ASC]) : : +- Calc(select=[id AS tb1_id, name AS tb3_name, _UTF-16LE',' AS $f2]) : : +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(tb3_id, id0)], select=[id, tb3_id, id0, name]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml index a57c307..b59a9c8 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml @@ -171,19 +171,19 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[MIN($3)], agg#1=[MAX($4)], agg#2=[SUM( </TestCase> <TestCase name="testSingleConcatAggWithDistinctAgg"> <Resource name="sql"> - <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> + <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +- LogicalProject(a=[$0], c=[$2], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalAggregate(group=[{0}], agg#0=[CONCAT_AGG($2)], agg#1=[$SUM0($3)]) -+- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[CONCAT_AGG($2) FILTER $4], agg#1=[COUNT(DISTINCT $1) FILTER $5]) +FlinkLogicalAggregate(group=[{0}], agg#0=[LISTAGG($2)], agg#1=[$SUM0($3)]) ++- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[LISTAGG($2) FILTER $4], agg#1=[COUNT(DISTINCT $1) FILTER $5]) +- FlinkLogicalCalc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0]) +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}]) +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml index 19d3208..9a1d03b 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml @@ -220,11 +220,11 @@ Calc(select=[a, b]) <![CDATA[== Abstract Syntax Tree == LogicalSink(name=[appendSink1], fields=[a, b]) +- LogicalProject(id1=[$1], EXPR$1=[$2]) - +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)]) - +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'*'], text=[$1]) + +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) + +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], text=[$1], $f3=[_UTF-16LE'*']) +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)]) - +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)]) - +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2]) + +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)]) + +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) +- LogicalProject(id1=[$0], ts=[$1], text=[$2]) +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) @@ -235,8 +235,8 @@ LogicalSink(name=[appendSink1], fields=[a, b]) LogicalSink(name=[appendSink2], fields=[a, b]) +- LogicalProject(id1=[$1], EXPR$1=[$2]) - +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)]) - +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'-'], text=[$2]) + +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) + +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'-']) +- LogicalProject(id1=[$0], ts=[$1], text=[$2]) +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) @@ -249,8 +249,8 @@ LogicalSink(name=[appendSink3], fields=[a, b]) +- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)]) +- LogicalProject(id1=[$0], text=[$1]) +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)]) - +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)]) - +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2]) + +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)]) + +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) +- LogicalProject(id1=[$0], ts=[$1], text=[$2]) +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) @@ -269,21 +269,21 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1]) +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None]) +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods]) -GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2]) +GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2]) +- Exchange(distribution=[hash[id1]]) - +- Calc(select=[ts, id1, _UTF-16LE'#' AS $f2, text]) + +- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3]) +- Reused(reference_id=[1]) Sink(name=[appendSink1], fields=[a, b]) -+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1]) ++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) +- Exchange(distribution=[hash[id1]]) - +- Calc(select=[w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text]) + +- Calc(select=[w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3]) +- Reused(reference_id=[2]) Sink(name=[appendSink2], fields=[a, b]) -+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1]) ++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) +- Exchange(distribution=[hash[id1]]) - +- Calc(select=[ts, id1, _UTF-16LE'-' AS $f2, text]) + +- Calc(select=[ts, id1, text, _UTF-16LE'-' AS $f3]) +- Reused(reference_id=[1]) Sink(name=[appendSink3], fields=[a, b]) @@ -325,19 +325,19 @@ Sink(name=[appendSink3], fields=[a, b]) ship_strategy : FORWARD : Operator - content : Calc(select: (ts, id1, _UTF-16LE'#' AS $f2, text)) + content : Calc(select: (ts, id1, text, _UTF-16LE'#' AS $f3)) ship_strategy : FORWARD : Operator - content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) + content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) ship_strategy : HASH : Operator - content : Calc(select: (w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text)) + content : Calc(select: (w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3)) ship_strategy : FORWARD : Operator - content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1) + content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1) ship_strategy : HASH : Operator @@ -345,11 +345,11 @@ Sink(name=[appendSink3], fields=[a, b]) ship_strategy : FORWARD : Operator - content : Calc(select: (ts, id1, _UTF-16LE'-' AS $f2, text)) + content : Calc(select: (ts, id1, text, _UTF-16LE'-' AS $f3)) ship_strategy : FORWARD : Operator - content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1) + content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1) ship_strategy : HASH : Operator diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml index e9ec761..dc74eb4 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml @@ -677,18 +677,18 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA </TestCase> <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=ONE_PHASE]"> <Resource name="sql"> - <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> + <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +- LogicalProject(a=[$0], c=[$2], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2]) +GroupAggregate(groupBy=[a], select=[a, LISTAGG(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2]) +- Exchange(distribution=[hash[a]]) +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) @@ -697,20 +697,20 @@ GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT b </TestCase> <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=TWO_PHASE]"> <Resource name="sql"> - <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> + <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +- LogicalProject(a=[$0], c=[$2], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2]) +GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2]) +- Exchange(distribution=[hash[a]]) - +- LocalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0]) + +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0]) +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -718,20 +718,20 @@ GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatA </TestCase> <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=ONE_PHASE]"> <Resource name="sql"> - <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> + <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +- LogicalProject(a=[$0], c=[$2], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2]) +GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2]) +- Exchange(distribution=[hash[a]]) - +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0]) + +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0]) +- Exchange(distribution=[hash[a, $f3]]) +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0]) +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}]) @@ -743,23 +743,23 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETR </TestCase> <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]"> <Resource name="sql"> - <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> + <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +- LogicalProject(a=[$0], c=[$2], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT(concat_agg$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2]) +GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT(listagg$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2]) +- Exchange(distribution=[hash[a]]) - +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT($f2) AS concat_agg$0, $SUM0_RETRACT($f3_0) AS sum$1, COUNT_RETRACT(*) AS count1$2]) - +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 count$2) AS $f3_0]) + +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS listagg$0, $SUM0_RETRACT($f3_0) AS sum$1, COUNT_RETRACT(*) AS count1$2]) + +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 count$2) AS $f3_0]) +- Exchange(distribution=[hash[a, $f3]]) - +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0]) + +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0]) +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0]) +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}]) +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml index a415121..ccef832 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml @@ -196,22 +196,22 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA </TestCase> <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]"> <Resource name="sql"> - <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> + <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)]) +- LogicalProject(a=[$0], c=[$2], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS $f1, $SUM0(count$2) AS $f2]) +GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f1, $SUM0(count$2) AS $f2]) +- Exchange(distribution=[hash[a]]) - +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], finalAggGrouping=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2]) + +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], finalAggGrouping=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2]) +- Exchange(distribution=[hash[a, $f3]]) - +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0]) + +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0]) +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0]) +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}]) +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3]) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala index 9952e34..198fb0f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala @@ -131,7 +131,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase { val table1 = util.tableEnv.sqlQuery( """ - |SELECT id1, CONCAT_AGG('#', text) + |SELECT id1, LISTAGG(text, '#') |FROM TempTable |GROUP BY id1, TUMBLE(ts, INTERVAL '8' SECOND) """.stripMargin) @@ -140,7 +140,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase { val table2 = util.tableEnv.sqlQuery( """ - |SELECT id1, CONCAT_AGG('*', text) + |SELECT id1, LISTAGG(text, '*') |FROM TempTable |GROUP BY id1, HOP(ts, INTERVAL '12' SECOND, INTERVAL '6' SECOND) """.stripMargin) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala index 73c202d..5a5bdf5 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala @@ -307,7 +307,7 @@ class RemoveCollationTest extends TableTestBase { |), | |join_tb2 as ( - | select tb1_id, concat_agg(tb2_name, ',') as tb2_names + | select tb1_id, LISTAGG(tb2_name, ',') as tb2_names | from ( | select v1.id as tb1_id, tb2.name as tb2_name | from v1 left outer join tb2 on tb2_id = tb2.id @@ -315,7 +315,7 @@ class RemoveCollationTest extends TableTestBase { |), | |join_tb3 as ( - | select tb1_id, concat_agg(tb3_name, ',') as tb3_names + | select tb1_id, LISTAGG(tb3_name, ',') as tb3_names | from ( | select v2.id as tb1_id, tb3.name as tb3_name | from v2 left outer join tb3 on tb3_id = tb3.id diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala index b91649b..5d58dfb 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala @@ -95,7 +95,7 @@ class SplitAggregateRuleTest extends TableTestBase { @Test def testSingleConcatAggWithDistinctAgg(): Unit = { - util.verifyPlan("SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a") + util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a") } @Test diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala index ce9e042..9de3886 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala @@ -297,7 +297,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { val table2 = util.tableEnv.sqlQuery( """ |SELECT id1, - | CONCAT_AGG('#', text) as text, + | LISTAGG(text, '#') as text, | TUMBLE_ROWTIME(ts, INTERVAL '6' SECOND) as ts |FROM TempTable1 |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1 @@ -307,7 +307,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { val table3 = util.tableEnv.sqlQuery( """ |SELECT id1, - | CONCAT_AGG('*', text) + | LISTAGG(text, '*') |FROM TempTable2 |GROUP BY HOP(ts, INTERVAL '12' SECOND, INTERVAL '4' SECOND), id1 """.stripMargin) @@ -317,7 +317,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { val table4 = util.tableEnv.sqlQuery( """ |SELECT id1, - | CONCAT_AGG('-', text) + | LISTAGG(text, '-') |FROM TempTable1 |GROUP BY TUMBLE(ts, INTERVAL '9' SECOND), id1 """.stripMargin) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala index 7a30103..8e36305 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala @@ -103,7 +103,7 @@ class DistinctAggregateTest( @Test def testSingleConcatAggWithDistinctAgg(): Unit = { - util.verifyPlan("SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a") + util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a") } @Test diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala index 5ce79fc..99ed303 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala @@ -22,14 +22,16 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ import org.apache.flink.table.planner.utils.{TableFunc0, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Assert.{assertTrue, fail} import org.junit.Test class AggregateValidationTest extends TableTestBase { + private val util = scalaStreamTestUtil() @Test(expected = classOf[ValidationException]) def testGroupingOnNonExistentField(): Unit = { - val util = streamTestUtil() val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c) val ds = table @@ -40,7 +42,6 @@ class AggregateValidationTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testGroupingInvalidSelection(): Unit = { - val util = streamTestUtil() val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c) table @@ -51,7 +52,6 @@ class AggregateValidationTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidAggregationInSelection(): Unit = { - val util = streamTestUtil() val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c) table @@ -63,7 +63,6 @@ class AggregateValidationTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidWindowPropertiesInSelection(): Unit = { - val util = streamTestUtil() val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c) table @@ -75,7 +74,6 @@ class AggregateValidationTest extends TableTestBase { @Test(expected = classOf[RuntimeException]) def testTableFunctionInSelection(): Unit = { - val util = streamTestUtil() val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c) util.addFunction("func", new TableFunc0) @@ -90,7 +88,6 @@ class AggregateValidationTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidScalarFunctionInAggregate(): Unit = { - val util = streamTestUtil() val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c) table @@ -102,7 +99,6 @@ class AggregateValidationTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidTableFunctionInAggregate(): Unit = { - val util = streamTestUtil() val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c) util.addFunction("func", new TableFunc0) @@ -115,13 +111,52 @@ class AggregateValidationTest extends TableTestBase { @Test(expected = classOf[RuntimeException]) def testMultipleAggregateExpressionInAggregate(): Unit = { - val util = streamTestUtil() - val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c) - util.addFunction("func", new TableFunc0) + val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c) table .groupBy('a) // must fail. Only one AggregateFunction can be used in aggregate .aggregate("sum(c), count(b)") } + + @Test + def testIllegalArgumentForListAgg(): Unit = { + util.addTableSource[(Long, Int, String, String)]("T", 'a, 'b, 'c, 'd) + // If there are two parameters, second one must be character literal. + expectExceptionThrown( + "SELECT listagg(c, d) FROM T GROUP BY a", + "Supported form(s): 'LISTAGG(<CHARACTER>)'\n'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)", + classOf[ValidationException]) + } + + @Test + def testIllegalArgumentForListAgg1(): Unit = { + util.addTableSource[(Long, Int, String, String)]("T", 'a, 'b, 'c, 'd) + // If there are two parameters, second one must be character literal. + expectExceptionThrown( + "SELECT LISTAGG(c, 1) FROM T GROUP BY a", + "Supported form(s): 'LISTAGG(<CHARACTER>)'\n'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)", + classOf[ValidationException]) + } + + // ---------------------------------------------------------------------------------------------- + + private def expectExceptionThrown( + sql: String, + keywords: String, + clazz: Class[_ <: Throwable] = classOf[ValidationException]) + : Unit = { + try { + util.tableEnv.toAppendStream[Row](util.tableEnv.sqlQuery(sql)) + fail(s"Expected a $clazz, but no exception is thrown.") + } catch { + case e if e.getClass == clazz => + if (keywords != null) { + assertTrue( + s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'", + e.getMessage.contains(keywords)) + } + case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.") + } + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala index f64b0b6..eed0d3e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala @@ -138,27 +138,26 @@ class SortAggITCase } // NOTE: Spark has agg functions collect_list(), collect_set(). - // instead, we'll test concat_agg() here - @Ignore + // instead, we'll test LISTAGG() here @Test def testConcatAgg(): Unit = { checkResult( - "SELECT concat_agg('-', c), concat_agg(c) FROM SmallTable3", + "SELECT LISTAGG(c, '-'), LISTAGG(c) FROM SmallTable3", Seq( - row("Hi-Hello-Hello world", "Hi\nHello\nHello world") + row("Hi-Hello-Hello world", "Hi,Hello,Hello world") ) ) // EmptyTable5 checkResult( - "SELECT concat_agg('-', g), concat_agg(g) FROM EmptyTable5", + "SELECT LISTAGG(g, '-'), LISTAGG(g) FROM EmptyTable5", Seq( row(null, null) ) ) checkResult( - "SELECT concat_agg('-', c), concat_agg(c) FROM AllNullTable3", + "SELECT LISTAGG(c, '-'), LISTAGG(c) FROM AllNullTable3", Seq( row(null, null) ) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index 318487fe..41561a3 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api.Types import org.apache.flink.table.api.scala._ -import org.apache.flink.table.planner.functions.aggfunctions.{ConcatWithRetractAggFunction, ConcatWsWithRetractAggFunction} +import org.apache.flink.table.planner.functions.aggfunctions.{ListAggWithRetractAggFunction, ListAggWsWithRetractAggFunction} import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSumAggFunction import org.apache.flink.table.planner.runtime.batch.sql.agg.{MyPojoAggFunction, VarArgsAggFunction} import org.apache.flink.table.planner.runtime.utils.StreamingWithAggTestBase.AggMode @@ -605,7 +605,7 @@ class AggregateITCase( val sqlQuery = s""" - |SELECT len, concat_agg('#', content) FROM T GROUP BY len + |SELECT len, listagg(content, '#') FROM T GROUP BY len """.stripMargin val sink = new TestingRetractSink @@ -629,7 +629,7 @@ class AggregateITCase( val sqlQuery = s""" - |SELECT len, concat_agg(content) FROM T GROUP BY len + |SELECT len, listagg(content) FROM T GROUP BY len """.stripMargin val sink = new TestingRetractSink @@ -906,7 +906,7 @@ class AggregateITCase( """ |SELECT b, min(c), max(c) |FROM ( - | SELECT a, b, concat_agg(c) as c + | SELECT a, b, listagg(c) as c | FROM T | GROUP BY a, b) |GROUP BY b @@ -1061,15 +1061,15 @@ class AggregateITCase( assertEquals(expected.sorted, sink.getRetractResults.sorted) } - /** Test CONCAT_AGG **/ + /** Test LISTAGG **/ @Test def testConcatAgg(): Unit = { - tEnv.registerFunction("concat_agg_retract", new ConcatWithRetractAggFunction) - tEnv.registerFunction("concat_agg_ws_retract", new ConcatWsWithRetractAggFunction) + tEnv.registerFunction("listagg_retract", new ListAggWithRetractAggFunction) + tEnv.registerFunction("listagg_ws_retract", new ListAggWsWithRetractAggFunction) val sqlQuery = s""" |SELECT - | concat_agg(c), concat_agg('-', c), concat_agg_retract(c), concat_agg_ws_retract('+', c) + | listagg(c), listagg(c, '-'), listagg_retract(c), listagg_ws_retract(c, '+') |FROM MyTable |GROUP BY c |""".stripMargin @@ -1085,8 +1085,8 @@ class AggregateITCase( val sink = new TestingRetractSink tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink) env.execute() - val expected = List("Hi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi,Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi," + - "Hi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi,Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi") + val expected = List("Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi," + + "Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi") assertEquals(expected.sorted, sink.getRetractResults.sorted) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala index 659f747..f2c29c6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala @@ -125,7 +125,7 @@ class AggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase // @Test // def testSimpleLogical(): Unit = { // val t = failingDataSource(smallTupleData3).toTable(tEnv, 'a, 'b, 'c) -// .select('c.firstValue, 'c.lastValue, 'c.concat_agg("#")) +// .select('c.firstValue, 'c.lastValue, 'c.LISTAGG("#")) // // val sink = new TestingRetractSink() // t.toRetractStream[Row].addSink(sink)
