This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca414aeb4e7495bbfa2751d241aafc85bc3fa9b6
Author: beyond1920 <[email protected]>
AuthorDate: Thu Aug 1 16:55:23 2019 +0800

    [FLINK-13529][table-planner-blink] Rename CONCAT_AGG to LISTAGG and fix the 
behavior according to the ANSI-SQL
    
    According to the ANSI-SQL, the LISTAGG function is used to transform values 
from a group of rows into a list of values that are delimited by a configurable 
separator.
    
    This closes #9316
---
 ...ConcatAggFunction.java => ListAggFunction.java} | 14 +++---
 ...ion.java => ListAggWithRetractAggFunction.java} | 30 ++++++------
 ...n.java => ListAggWsWithRetractAggFunction.java} | 30 ++++++------
 .../functions/sql/FlinkSqlOperatorTable.java       |  4 +-
 ...catAggFunction.java => SqlListAggFunction.java} | 27 +++++++----
 .../plan/rules/logical/SplitAggregateRule.scala    |  4 +-
 .../planner/plan/utils/AggFunctionFactory.scala    | 22 ++++-----
 .../table/planner/plan/utils/AggregateUtil.scala   |  4 +-
 ...java => ListAggWithRetractAggFunctionTest.java} | 16 +++----
 ...va => ListAggWsWithRetractAggFunctionTest.java} | 36 +++++++-------
 .../apache/flink/table/api/stream/ExplainTest.xml  | 48 +++++++++----------
 .../planner/plan/batch/sql/RemoveCollationTest.xml | 16 +++----
 .../plan/rules/logical/SplitAggregateRuleTest.xml  |  8 ++--
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 40 ++++++++--------
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  | 34 ++++++-------
 .../stream/sql/agg/IncrementalAggregateTest.xml    | 10 ++--
 .../flink/table/api/stream/ExplainTest.scala       |  4 +-
 .../plan/batch/sql/RemoveCollationTest.scala       |  4 +-
 .../rules/logical/SplitAggregateRuleTest.scala     |  2 +-
 .../stream/sql/MiniBatchIntervalInferTest.scala    |  6 +--
 .../stream/sql/agg/DistinctAggregateTest.scala     |  2 +-
 .../table/validation/AggregateValidationTest.scala | 55 ++++++++++++++++++----
 .../runtime/batch/sql/agg/SortAggITCase.scala      | 11 ++---
 .../runtime/stream/sql/AggregateITCase.scala       | 20 ++++----
 .../runtime/stream/table/AggregateITCase.scala     |  2 +-
 25 files changed, 245 insertions(+), 204 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
similarity index 91%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java
rename to 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
index 2e8f5fb..377b251 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
@@ -32,23 +32,23 @@ import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.liter
 import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
 
 /**
- * built-in concat aggregate function.
+ * built-in listagg aggregate function.
  */
-public class ConcatAggFunction extends DeclarativeAggregateFunction {
+public class ListAggFunction extends DeclarativeAggregateFunction {
        private int operandCount;
        private UnresolvedReferenceExpression acc = unresolvedRef("concatAcc");
        private UnresolvedReferenceExpression accDelimiter = 
unresolvedRef("accDelimiter");
        private Expression delimiter;
        private Expression operand;
 
-       public ConcatAggFunction(int operandCount) {
+       public ListAggFunction(int operandCount) {
                this.operandCount = operandCount;
                if (operandCount == 1) {
-                       delimiter = literal("\n", DataTypes.STRING());
+                       delimiter = literal(",", DataTypes.STRING());
                        operand = operand(0);
                } else {
-                       delimiter = operand(0);
-                       operand = operand(1);
+                       delimiter = operand(1);
+                       operand = operand(0);
                }
        }
 
@@ -75,7 +75,7 @@ public class ConcatAggFunction extends 
DeclarativeAggregateFunction {
        @Override
        public Expression[] initialValuesExpressions() {
                return new Expression[] {
-                               /* delimiter */ literal("\n", 
DataTypes.STRING()),
+                               /* delimiter */ literal(",", 
DataTypes.STRING()),
                                /* acc */ nullOf(DataTypes.STRING())
                };
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
similarity index 77%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java
rename to 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
index ea5857a..8840844 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
@@ -31,18 +31,18 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * built-in concat with retraction aggregate function.
+ * built-in listagg with retraction aggregate function.
  */
-public final class ConcatWithRetractAggFunction
-       extends AggregateFunction<BinaryString, 
ConcatWithRetractAggFunction.ConcatWithRetractAccumulator> {
+public final class ListAggWithRetractAggFunction
+       extends AggregateFunction<BinaryString, 
ListAggWithRetractAggFunction.ListAggWithRetractAccumulator> {
 
        private static final long serialVersionUID = -2836795091288790955L;
-       private static final BinaryString lineDelimiter = 
BinaryString.fromString("\n");
+       private static final BinaryString lineDelimiter = 
BinaryString.fromString(",");
 
        /**
-        * The initial accumulator for concat with retraction aggregate 
function.
+        * The initial accumulator for listagg with retraction aggregate 
function.
         */
-       public static class ConcatWithRetractAccumulator {
+       public static class ListAggWithRetractAccumulator {
                public ListView<BinaryString> list = new 
ListView<>(BinaryStringTypeInfo.INSTANCE);
                public ListView<BinaryString> retractList = new 
ListView<>(BinaryStringTypeInfo.INSTANCE);
 
@@ -55,25 +55,25 @@ public final class ConcatWithRetractAggFunction
                        if (o == null || getClass() != o.getClass()) {
                                return false;
                        }
-                       ConcatWithRetractAccumulator that = 
(ConcatWithRetractAccumulator) o;
+                       ListAggWithRetractAccumulator that = 
(ListAggWithRetractAccumulator) o;
                        return Objects.equals(list, that.list) &&
                                Objects.equals(retractList, that.retractList);
                }
        }
 
        @Override
-       public ConcatWithRetractAccumulator createAccumulator() {
-               return new ConcatWithRetractAccumulator();
+       public ListAggWithRetractAccumulator createAccumulator() {
+               return new ListAggWithRetractAccumulator();
        }
 
-       public void accumulate(ConcatWithRetractAccumulator acc, BinaryString 
value) throws Exception {
+       public void accumulate(ListAggWithRetractAccumulator acc, BinaryString 
value) throws Exception {
                // ignore null value
                if (value != null) {
                        acc.list.add(value);
                }
        }
 
-       public void retract(ConcatWithRetractAccumulator acc, BinaryString 
value) throws Exception {
+       public void retract(ListAggWithRetractAccumulator acc, BinaryString 
value) throws Exception {
                if (value != null) {
                        if (!acc.list.remove(value)) {
                                acc.retractList.add(value);
@@ -81,8 +81,8 @@ public final class ConcatWithRetractAggFunction
                }
        }
 
-       public void merge(ConcatWithRetractAccumulator acc, 
Iterable<ConcatWithRetractAccumulator> its) throws Exception {
-               for (ConcatWithRetractAccumulator otherAcc : its) {
+       public void merge(ListAggWithRetractAccumulator acc, 
Iterable<ListAggWithRetractAccumulator> its) throws Exception {
+               for (ListAggWithRetractAccumulator otherAcc : its) {
                        // merge list of acc and other
                        List<BinaryString> buffer = new ArrayList<>();
                        for (BinaryString binaryString : acc.list.get()) {
@@ -117,7 +117,7 @@ public final class ConcatWithRetractAggFunction
        }
 
        @Override
-       public BinaryString getValue(ConcatWithRetractAccumulator acc) {
+       public BinaryString getValue(ListAggWithRetractAccumulator acc) {
                try {
                        Iterable<BinaryString> accList = acc.list.get();
                        if (accList == null || !accList.iterator().hasNext()) {
@@ -131,7 +131,7 @@ public final class ConcatWithRetractAggFunction
                }
        }
 
-       public void resetAccumulator(ConcatWithRetractAccumulator acc) {
+       public void resetAccumulator(ListAggWithRetractAccumulator acc) {
                acc.list.clear();
                acc.retractList.clear();
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
similarity index 77%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java
rename to 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
index a968cb4..317f97e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
@@ -31,20 +31,20 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * built-in concatWs with retraction aggregate function.
+ * built-in listAggWs with retraction aggregate function.
  */
-public final class ConcatWsWithRetractAggFunction
-       extends AggregateFunction<BinaryString, 
ConcatWsWithRetractAggFunction.ConcatWsWithRetractAccumulator> {
+public final class ListAggWsWithRetractAggFunction
+       extends AggregateFunction<BinaryString, 
ListAggWsWithRetractAggFunction.ListAggWsWithRetractAccumulator> {
 
        private static final long serialVersionUID = -8627988150350160473L;
 
        /**
         * The initial accumulator for concat with retraction aggregate 
function.
         */
-       public static class ConcatWsWithRetractAccumulator {
+       public static class ListAggWsWithRetractAccumulator {
                public ListView<BinaryString> list = new 
ListView<>(BinaryStringTypeInfo.INSTANCE);
                public ListView<BinaryString> retractList = new 
ListView<>(BinaryStringTypeInfo.INSTANCE);
-               public BinaryString delimiter = BinaryString.fromString("\n");
+               public BinaryString delimiter = BinaryString.fromString(",");
 
                @VisibleForTesting
                @Override
@@ -55,7 +55,7 @@ public final class ConcatWsWithRetractAggFunction
                        if (o == null || getClass() != o.getClass()) {
                                return false;
                        }
-                       ConcatWsWithRetractAccumulator that = 
(ConcatWsWithRetractAccumulator) o;
+                       ListAggWsWithRetractAccumulator that = 
(ListAggWsWithRetractAccumulator) o;
                        return Objects.equals(list, that.list) &&
                                Objects.equals(retractList, that.retractList) &&
                                Objects.equals(delimiter, that.delimiter);
@@ -63,11 +63,11 @@ public final class ConcatWsWithRetractAggFunction
        }
 
        @Override
-       public ConcatWsWithRetractAccumulator createAccumulator() {
-               return new ConcatWsWithRetractAccumulator();
+       public ListAggWsWithRetractAccumulator createAccumulator() {
+               return new ListAggWsWithRetractAccumulator();
        }
 
-       public void accumulate(ConcatWsWithRetractAccumulator acc, BinaryString 
lineDelimiter, BinaryString value) throws Exception {
+       public void accumulate(ListAggWsWithRetractAccumulator acc, 
BinaryString value, BinaryString lineDelimiter) throws Exception {
                // ignore null value
                if (value != null) {
                        acc.delimiter = lineDelimiter;
@@ -75,7 +75,7 @@ public final class ConcatWsWithRetractAggFunction
                }
        }
 
-       public void retract(ConcatWsWithRetractAccumulator acc, BinaryString 
lineDelimiter, BinaryString value) throws Exception {
+       public void retract(ListAggWsWithRetractAccumulator acc, BinaryString 
value, BinaryString lineDelimiter) throws Exception {
                if (value != null) {
                        acc.delimiter = lineDelimiter;
                        if (!acc.list.remove(value)) {
@@ -84,8 +84,8 @@ public final class ConcatWsWithRetractAggFunction
                }
        }
 
-       public void merge(ConcatWsWithRetractAccumulator acc, 
Iterable<ConcatWsWithRetractAccumulator> its) throws Exception {
-               for (ConcatWsWithRetractAccumulator otherAcc : its) {
+       public void merge(ListAggWsWithRetractAccumulator acc, 
Iterable<ListAggWsWithRetractAccumulator> its) throws Exception {
+               for (ListAggWsWithRetractAccumulator otherAcc : its) {
                        if (!otherAcc.list.get().iterator().hasNext()
                                && 
!otherAcc.retractList.get().iterator().hasNext()) {
                                // otherAcc is empty, skip it
@@ -127,7 +127,7 @@ public final class ConcatWsWithRetractAggFunction
        }
 
        @Override
-       public BinaryString getValue(ConcatWsWithRetractAccumulator acc) {
+       public BinaryString getValue(ListAggWsWithRetractAccumulator acc) {
                try {
                        Iterable<BinaryString> accList = acc.list.get();
                        if (accList == null || !accList.iterator().hasNext()) {
@@ -141,8 +141,8 @@ public final class ConcatWsWithRetractAggFunction
                }
        }
 
-       public void resetAccumulator(ConcatWsWithRetractAccumulator acc) {
-               acc.delimiter = BinaryString.fromString("\n");
+       public void resetAccumulator(ListAggWsWithRetractAccumulator acc) {
+               acc.delimiter = BinaryString.fromString(",");
                acc.list.clear();
                acc.retractList.clear();
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 50ff193..dbabb69 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -946,9 +946,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
        public static final SqlFirstLastValueAggFunction LAST_VALUE = new 
SqlFirstLastValueAggFunction(SqlKind.LAST_VALUE);
 
        /**
-        * <code>CONCAT_AGG</code> aggregate function.
+        * <code>LISTAGG</code> aggregate function.
         */
-       public static final SqlConcatAggFunction CONCAT_AGG = new 
SqlConcatAggFunction();
+       public static final SqlListAggFunction LISTAGG = new 
SqlListAggFunction();
 
        /**
         * <code>INCR_SUM</code> aggregate function.
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
similarity index 71%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java
rename to 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
index 7e215e0..9557f0c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
@@ -25,30 +25,37 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.type.SqlTypeTransforms;
 
 import java.util.List;
 
 /**
- * <code>CONCAT_AGG</code> aggregate function returns the concatenation of
+ * <code>LISTAGG</code> aggregate function returns the concatenation of
  * a list of values that are input to the function.
+ *
+ * <p>NOTE: The difference between this and {@link 
SqlStdOperatorTable#LISTAGG} is that:
+ * (1). constraint the second parameter must to be a character literal.
+ * (2). not require over clause to use this aggregate function.
  */
-public class SqlConcatAggFunction extends SqlAggFunction {
+public class SqlListAggFunction extends SqlAggFunction {
 
-       public SqlConcatAggFunction() {
-               super("CONCAT_AGG",
+       public SqlListAggFunction() {
+               super("LISTAGG",
                                null,
-                               SqlKind.OTHER_FUNCTION,
-                               
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), 
SqlTypeTransforms.TO_NULLABLE),
+                               SqlKind.LISTAGG,
+                               ReturnTypes.ARG0_NULLABLE,
                                null,
                                OperandTypes.or(
                                                OperandTypes.CHARACTER,
-                                               
OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
-                               SqlFunctionCategory.STRING,
+                                               OperandTypes.sequence(
+                                                               
"'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)'",
+                                                               
OperandTypes.CHARACTER,
+                                                               
OperandTypes.and(OperandTypes.CHARACTER, OperandTypes.LITERAL)
+                                               )),
+                               SqlFunctionCategory.SYSTEM,
                                false,
                                false);
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index b2bd3a5..524d5a3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -334,8 +334,8 @@ object SplitAggregateRule {
       (Seq(FlinkSqlOperatorTable.FIRST_VALUE), 
Seq(FlinkSqlOperatorTable.FIRST_VALUE)),
     FlinkSqlOperatorTable.LAST_VALUE ->
       (Seq(FlinkSqlOperatorTable.LAST_VALUE), 
Seq(FlinkSqlOperatorTable.LAST_VALUE)),
-    FlinkSqlOperatorTable.CONCAT_AGG ->
-      (Seq(FlinkSqlOperatorTable.CONCAT_AGG), 
Seq(FlinkSqlOperatorTable.CONCAT_AGG)),
+    FlinkSqlOperatorTable.LISTAGG ->
+      (Seq(FlinkSqlOperatorTable.LISTAGG), Seq(FlinkSqlOperatorTable.LISTAGG)),
     SINGLE_VALUE -> (Seq(SINGLE_VALUE), Seq(SINGLE_VALUE))
   )
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 4dd3439..a8e00d3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -31,7 +31,7 @@ import 
org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFu
 import 
org.apache.flink.table.planner.functions.aggfunctions.SingleValueAggFunction._
 import 
org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFunction._
 import org.apache.flink.table.planner.functions.aggfunctions._
-import org.apache.flink.table.planner.functions.sql.{SqlConcatAggFunction, 
SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
+import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, 
SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
 import org.apache.flink.table.planner.functions.utils.AggSqlFunction
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
 import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
@@ -113,11 +113,11 @@ class AggFunctionFactory(
       case a: SqlFirstLastValueAggFunction if a.getKind == SqlKind.LAST_VALUE 
=>
         createLastValueAggFunction(argTypes, index)
 
-      case _: SqlConcatAggFunction if call.getArgList.size() == 1 =>
-        createConcatAggFunction(argTypes, index)
+      case _: SqlListAggFunction if call.getArgList.size() == 1 =>
+        createListAggFunction(argTypes, index)
 
-      case _: SqlConcatAggFunction if call.getArgList.size() == 2 =>
-        createConcatWsAggFunction(argTypes, index)
+      case _: SqlListAggFunction if call.getArgList.size() == 2 =>
+        createListAggWsFunction(argTypes, index)
 
       // TODO supports SqlCardinalityCountAggFunction
 
@@ -606,23 +606,23 @@ class AggFunctionFactory(
     }
   }
 
-  private def createConcatAggFunction(
+  private def createListAggFunction(
       argTypes: Array[LogicalType],
       index: Int): UserDefinedFunction = {
     if (needRetraction(index)) {
-      new ConcatWithRetractAggFunction
+      new ListAggWithRetractAggFunction
     } else {
-      new ConcatAggFunction(1)
+      new ListAggFunction(1)
     }
   }
 
-  private def createConcatWsAggFunction(
+  private def createListAggWsFunction(
       argTypes: Array[LogicalType],
       index: Int): UserDefinedFunction = {
     if (needRetraction(index)) {
-      new ConcatWsWithRetractAggFunction
+      new ListAggWsWithRetractAggFunction
     } else {
-      new ConcatAggFunction(2)
+      new ListAggFunction(2)
     }
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 544555a..8877be5 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -32,7 +32,7 @@ import 
org.apache.flink.table.planner.dataview.DataViewUtils.useNullSerializerFo
 import org.apache.flink.table.planner.dataview.{DataViewSpec, MapViewSpec}
 import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, 
PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart, RexNodeConverter}
 import 
org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
-import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, 
SqlConcatAggFunction, SqlFirstLastValueAggFunction}
+import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, 
SqlListAggFunction, SqlFirstLastValueAggFunction}
 import org.apache.flink.table.planner.functions.utils.AggSqlFunction
 import 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
@@ -541,7 +541,7 @@ object AggregateUtil extends Enumeration {
              _: SqlSumAggFunction |
              _: SqlSumEmptyIsZeroAggFunction |
              _: SqlSingleValueAggFunction |
-             _: SqlConcatAggFunction => true
+             _: SqlListAggFunction => true
         case _: SqlFirstLastValueAggFunction => aggCall.getArgList.size() == 1
         case _ => false
       }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
similarity index 79%
rename from 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java
rename to 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
index 725d802..5c5566e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
@@ -20,17 +20,17 @@ package 
org.apache.flink.table.planner.functions.aggfunctions;
 
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.functions.AggregateFunction;
-import 
org.apache.flink.table.planner.functions.aggfunctions.ConcatWithRetractAggFunction.ConcatWithRetractAccumulator;
+import 
org.apache.flink.table.planner.functions.aggfunctions.ListAggWithRetractAggFunction.ListAggWithRetractAccumulator;
 
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.List;
 
 /**
- * Test case for built-in concat with retraction aggregate function.
+ * Test case for built-in LISTAGG with retraction aggregate function.
  */
-public class ConcatWithRetractAggFunctionTest
-       extends AggFunctionTestBase<BinaryString, ConcatWithRetractAccumulator> 
{
+public class ListAggWithRetractAggFunctionTest
+       extends AggFunctionTestBase<BinaryString, 
ListAggWithRetractAccumulator> {
 
        @Override
        protected List<List<BinaryString>> getInputValueSets() {
@@ -53,14 +53,14 @@ public class ConcatWithRetractAggFunctionTest
        @Override
        protected List<BinaryString> getExpectedResults() {
                return Arrays.asList(
-                               BinaryString.fromString("a\nb\nc\nd\ne\nf"),
+                               BinaryString.fromString("a,b,c,d,e,f"),
                                null,
                                BinaryString.fromString("a"));
        }
 
        @Override
-       protected AggregateFunction<BinaryString, ConcatWithRetractAccumulator> 
getAggregator() {
-               return new ConcatWithRetractAggFunction();
+       protected AggregateFunction<BinaryString, 
ListAggWithRetractAccumulator> getAggregator() {
+               return new ListAggWithRetractAggFunction();
        }
 
        @Override
@@ -75,6 +75,6 @@ public class ConcatWithRetractAggFunctionTest
 
        @Override
        protected Class<?> getAccClass() {
-               return ConcatWithRetractAccumulator.class;
+               return ListAggWithRetractAccumulator.class;
        }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
similarity index 78%
rename from 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java
rename to 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
index d9e06d0..67ee8ff 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.functions.aggfunctions;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.functions.AggregateFunction;
-import 
org.apache.flink.table.planner.functions.aggfunctions.ConcatWsWithRetractAggFunction.ConcatWsWithRetractAccumulator;
+import 
org.apache.flink.table.planner.functions.aggfunctions.ListAggWsWithRetractAggFunction.ListAggWsWithRetractAccumulator;
 import org.apache.flink.util.Preconditions;
 
 import java.lang.reflect.InvocationTargetException;
@@ -34,8 +34,8 @@ import static org.junit.Assert.assertEquals;
 /**
  * Test case for built-in concatWs with retraction aggregate function.
  */
-public class ConcatWsWithRetractAggFunctionTest
-       extends AggFunctionTestBase<BinaryString, 
ConcatWsWithRetractAccumulator> {
+public class ListAggWsWithRetractAggFunctionTest
+       extends AggFunctionTestBase<BinaryString, 
ListAggWsWithRetractAccumulator> {
 
        @Override
        protected List<List<BinaryString>> getInputValueSets() {
@@ -84,8 +84,8 @@ public class ConcatWsWithRetractAggFunctionTest
        }
 
        @Override
-       protected AggregateFunction<BinaryString, 
ConcatWsWithRetractAccumulator> getAggregator() {
-               return new ConcatWsWithRetractAggFunction();
+       protected AggregateFunction<BinaryString, 
ListAggWsWithRetractAccumulator> getAggregator() {
+               return new ListAggWsWithRetractAggFunction();
        }
 
        @Override
@@ -101,14 +101,14 @@ public class ConcatWsWithRetractAggFunctionTest
 
        @Override
        protected Class<?> getAccClass() {
-               return ConcatWsWithRetractAccumulator.class;
+               return ListAggWsWithRetractAccumulator.class;
        }
 
        @Override
        protected <E> void validateResult(E expected, E result) {
-               if (expected instanceof ConcatWsWithRetractAccumulator && 
result instanceof ConcatWsWithRetractAccumulator) {
-                       ConcatWsWithRetractAccumulator e = 
(ConcatWsWithRetractAccumulator) expected;
-                       ConcatWsWithRetractAccumulator r = 
(ConcatWsWithRetractAccumulator) result;
+               if (expected instanceof ListAggWsWithRetractAccumulator && 
result instanceof ListAggWsWithRetractAccumulator) {
+                       ListAggWsWithRetractAccumulator e = 
(ListAggWsWithRetractAccumulator) expected;
+                       ListAggWsWithRetractAccumulator r = 
(ListAggWsWithRetractAccumulator) result;
                        assertEquals(e.list, r.list);
                        assertEquals(e.list, r.list);
                } else {
@@ -117,31 +117,31 @@ public class ConcatWsWithRetractAggFunctionTest
        }
 
        @Override
-       protected ConcatWsWithRetractAccumulator 
accumulateValues(List<BinaryString> values)
+       protected ListAggWsWithRetractAccumulator 
accumulateValues(List<BinaryString> values)
                        throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
-               AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> 
aggregator = getAggregator();
-               ConcatWsWithRetractAccumulator accumulator = 
getAggregator().createAccumulator();
+               AggregateFunction<BinaryString, 
ListAggWsWithRetractAccumulator> aggregator = getAggregator();
+               ListAggWsWithRetractAccumulator accumulator = 
getAggregator().createAccumulator();
                Method accumulateFunc = getAccumulateFunc();
                Preconditions.checkArgument(values.size() % 2 == 0,
                                "number of values must be an integer multiple 
of 2.");
                for (int i = 0; i < values.size(); i += 2) {
-                       BinaryString value = values.get(i);
-                       BinaryString delimiter = values.get(i + 1);
+                       BinaryString value = values.get(i + 1);
+                       BinaryString delimiter = values.get(i);
                        accumulateFunc.invoke(aggregator, accumulator, 
delimiter, value);
                }
                return accumulator;
        }
 
        @Override
-       protected void retractValues(ConcatWsWithRetractAccumulator 
accumulator, List<BinaryString> values)
+       protected void retractValues(ListAggWsWithRetractAccumulator 
accumulator, List<BinaryString> values)
                        throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
-               AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> 
aggregator = getAggregator();
+               AggregateFunction<BinaryString, 
ListAggWsWithRetractAccumulator> aggregator = getAggregator();
                Method retractFunc = getRetractFunc();
                Preconditions.checkArgument(values.size() % 2 == 0,
                                "number of values must be an integer multiple 
of 2.");
                for (int i = 0; i < values.size(); i += 2) {
-                       BinaryString value = values.get(i);
-                       BinaryString delimiter = values.get(i + 1);
+                       BinaryString value = values.get(i + 1);
+                       BinaryString delimiter = values.get(i);
                        retractFunc.invoke(aggregator, accumulator, delimiter, 
value);
                }
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index fd84d11..16d3278 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -637,8 +637,8 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], 
updateAsRetraction=[false], ac
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(name=[appendSink1], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], 
$f2=[_UTF-16LE'#'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], 
text=[$2], $f3=[_UTF-16LE'#'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -649,8 +649,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
 
 LogicalSink(name=[appendSink2], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -670,15 +670,15 @@ Calc(select=[id1, rowtime AS ts, text], 
updateAsRetraction=[true], accMode=[Acc]
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[id2, cnt, name, goods, rowtime], updateAsRetraction=[true], 
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
 
 Sink(name=[appendSink1], fields=[a, b], updateAsRetraction=[false], 
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], 
select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], 
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], 
select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], 
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
    +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], 
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-      +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text], 
updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = 
{rows, cpu, io, network, memory}
+      +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3], 
updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = 
{rows, cpu, io, network, memory}
          +- Reused(reference_id=[1])
 
 Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], 
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 
6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], 
updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = 
{rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 
6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1], 
updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = 
{rows, cpu, io, network, memory}
    +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], 
accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-      +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text], 
updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = 
{rows, cpu, io, network, memory}
+      +- Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3], 
updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = 
{rows, cpu, io, network, memory}
          +- Reused(reference_id=[1])
 
 == Physical Execution Plan ==
@@ -713,11 +713,11 @@ Sink(name=[appendSink2], fields=[a, b], 
updateAsRetraction=[false], accMode=[Acc
                                                        ship_strategy : FORWARD
 
                                                         : Operator
-                                                               content : 
Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+                                                               content : 
Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
                                                                ship_strategy : 
FORWARD
 
                                                                 : Operator
-                                                                       content 
: window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, 
text) AS EXPR$1)
+                                                                       content 
: window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, 
$f3) AS EXPR$1)
                                                                        
ship_strategy : HASH
 
                                                                         : 
Operator
@@ -725,11 +725,11 @@ Sink(name=[appendSink2], fields=[a, b], 
updateAsRetraction=[false], accMode=[Acc
                                                                                
ship_strategy : FORWARD
 
                                                                                
 : Operator
-                                                                               
        content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+                                                                               
        content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
                                                                                
        ship_strategy : FORWARD
 
                                                                                
         : Operator
-                                                                               
                content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), 
groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+                                                                               
                content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), 
groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
                                                                                
                ship_strategy : HASH
 
                                                                                
                 : Operator
@@ -784,8 +784,8 @@ Union(all=[true], union=[a, b, c])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(name=[appendSink1], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], 
$f2=[_UTF-16LE'#'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], 
text=[$2], $f3=[_UTF-16LE'#'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -796,8 +796,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
 
 LogicalSink(name=[appendSink2], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -817,15 +817,15 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[id2, cnt, name, goods, rowtime])
 
 Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], 
select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], 
select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text])
+      +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
          +- Reused(reference_id=[1])
 
 Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 
6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 
6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text])
+      +- Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3])
          +- Reused(reference_id=[1])
 
 == Physical Execution Plan ==
@@ -860,11 +860,11 @@ Sink(name=[appendSink2], fields=[a, b])
                                                        ship_strategy : FORWARD
 
                                                         : Operator
-                                                               content : 
Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+                                                               content : 
Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
                                                                ship_strategy : 
FORWARD
 
                                                                 : Operator
-                                                                       content 
: window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, 
text) AS EXPR$1)
+                                                                       content 
: window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, 
$f3) AS EXPR$1)
                                                                        
ship_strategy : HASH
 
                                                                         : 
Operator
@@ -872,11 +872,11 @@ Sink(name=[appendSink2], fields=[a, b])
                                                                                
ship_strategy : FORWARD
 
                                                                                
 : Operator
-                                                                               
        content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+                                                                               
        content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
                                                                                
        ship_strategy : FORWARD
 
                                                                                
         : Operator
-                                                                               
                content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), 
groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+                                                                               
                content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), 
groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
                                                                                
                ship_strategy : HASH
 
                                                                                
                 : Operator
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
index fd8003b..76b6419 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
@@ -310,7 +310,7 @@ v2 as (
 ),
 
 join_tb2 as (
- select tb1_id, concat_agg(tb2_name, ',') as tb2_names
+ select tb1_id, LISTAGG(tb2_name, ',') as tb2_names
  from (
   select v1.id as tb1_id, tb2.name as tb2_name
    from v1 left outer join tb2 on tb2_id = tb2.id
@@ -318,7 +318,7 @@ join_tb2 as (
 ),
 
 join_tb3 as (
- select tb1_id, concat_agg(tb3_name, ',') as tb3_names
+ select tb1_id, LISTAGG(tb3_name, ',') as tb3_names
  from (
   select v2.id as tb1_id, tb3.name as tb3_name
    from v2 left outer join tb3 on tb3_id = tb3.id
@@ -349,7 +349,7 @@ LogicalProject(id=[$0], tb2_ids=[$2], tb3_ids=[$3], 
name=[$4], tb2_names=[$6], t
    :  :- LogicalJoin(condition=[=($0, $7)], joinType=[left])
    :  :  :- LogicalJoin(condition=[=($0, $5)], joinType=[left])
    :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, 
tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]])
-   :  :  :  +- LogicalAggregate(group=[{0}], tb2_names=[CONCAT_AGG($1, $2)])
+   :  :  :  +- LogicalAggregate(group=[{0}], tb2_names=[LISTAGG($1, $2)])
    :  :  :     +- LogicalProject(tb1_id=[$0], tb2_name=[$3], 
$f2=[_UTF-16LE','])
    :  :  :        +- LogicalJoin(condition=[=($1, $2)], joinType=[left])
    :  :  :           :- LogicalProject(id=[$0], tb2_id=[$5])
@@ -357,7 +357,7 @@ LogicalProject(id=[$0], tb2_ids=[$2], tb3_ids=[$3], 
name=[$4], tb2_names=[$6], t
    :  :  :           :     :- LogicalTableScan(table=[[default_catalog, 
default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, 
name)]]])
    :  :  :           :     +- 
LogicalTableFunctionScan(invocation=[split($cor0.tb2_ids)], 
rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class 
[Ljava.lang.Object;])
    :  :  :           +- LogicalTableScan(table=[[default_catalog, 
default_database, tb2, source: [TestTableSource(id, name)]]])
-   :  :  +- LogicalAggregate(group=[{0}], tb3_names=[CONCAT_AGG($1, $2)])
+   :  :  +- LogicalAggregate(group=[{0}], tb3_names=[LISTAGG($1, $2)])
    :  :     +- LogicalProject(tb1_id=[$0], tb3_name=[$3], $f2=[_UTF-16LE','])
    :  :        +- LogicalJoin(condition=[=($1, $2)], joinType=[left])
    :  :           :- LogicalProject(id=[$0], tb3_id=[$5])
@@ -382,10 +382,10 @@ Calc(select=[id, tb2_ids, tb3_ids, name, tb2_names, 
tb3_names, name0, name1])
    :     :        :  +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(id, 
tb1_id)], select=[id, key, tb2_ids, tb3_ids, name, tb1_id, tb2_names], 
rightSorted=[true])
    :     :        :     :- Exchange(distribution=[hash[id]])
    :     :        :     :  +- TableSourceScan(table=[[default_catalog, 
default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, 
name)]]], fields=[id, key, tb2_ids, tb3_ids, name], reuse_id=[1])
-   :     :        :     +- SortAggregate(isMerge=[true], groupBy=[tb1_id], 
select=[tb1_id, Final_CONCAT_AGG(accDelimiter$0, concatAcc$1) AS tb2_names])
+   :     :        :     +- SortAggregate(isMerge=[true], groupBy=[tb1_id], 
select=[tb1_id, Final_LISTAGG(accDelimiter$0, concatAcc$1) AS tb2_names])
    :     :        :        +- Sort(orderBy=[tb1_id ASC])
    :     :        :           +- Exchange(distribution=[hash[tb1_id]])
-   :     :        :              +- LocalSortAggregate(groupBy=[tb1_id], 
select=[tb1_id, Partial_CONCAT_AGG(tb2_name, $f2) AS (accDelimiter$0, 
concatAcc$1)])
+   :     :        :              +- LocalSortAggregate(groupBy=[tb1_id], 
select=[tb1_id, Partial_LISTAGG(tb2_name, $f2) AS (accDelimiter$0, 
concatAcc$1)])
    :     :        :                 +- Sort(orderBy=[tb1_id ASC])
    :     :        :                    +- Calc(select=[id AS tb1_id, name AS 
tb2_name, _UTF-16LE',' AS $f2])
    :     :        :                       +- 
SortMergeJoin(joinType=[LeftOuterJoin], where=[=(tb2_id, id0)], select=[id, 
tb2_id, id0, name])
@@ -395,10 +395,10 @@ Calc(select=[id, tb2_ids, tb3_ids, name, tb2_names, 
tb3_names, name0, name1])
    :     :        :                          :        +- 
Reused(reference_id=[1])
    :     :        :                          +- 
Exchange(distribution=[hash[id]])
    :     :        :                             +- 
TableSourceScan(table=[[default_catalog, default_database, tb2, source: 
[TestTableSource(id, name)]]], fields=[id, name])
-   :     :        +- SortAggregate(isMerge=[true], groupBy=[tb1_id], 
select=[tb1_id, Final_CONCAT_AGG(accDelimiter$0, concatAcc$1) AS tb3_names])
+   :     :        +- SortAggregate(isMerge=[true], groupBy=[tb1_id], 
select=[tb1_id, Final_LISTAGG(accDelimiter$0, concatAcc$1) AS tb3_names])
    :     :           +- Sort(orderBy=[tb1_id ASC])
    :     :              +- Exchange(distribution=[hash[tb1_id]])
-   :     :                 +- LocalSortAggregate(groupBy=[tb1_id], 
select=[tb1_id, Partial_CONCAT_AGG(tb3_name, $f2) AS (accDelimiter$0, 
concatAcc$1)])
+   :     :                 +- LocalSortAggregate(groupBy=[tb1_id], 
select=[tb1_id, Partial_LISTAGG(tb3_name, $f2) AS (accDelimiter$0, 
concatAcc$1)])
    :     :                    +- Sort(orderBy=[tb1_id ASC])
    :     :                       +- Calc(select=[id AS tb1_id, name AS 
tb3_name, _UTF-16LE',' AS $f2])
    :     :                          +- SortMergeJoin(joinType=[LeftOuterJoin], 
where=[=(tb3_id, id0)], select=[id, tb3_id, id0, name])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
index a57c307..b59a9c8 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
@@ -171,19 +171,19 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[MIN($3)], 
agg#1=[MAX($4)], agg#2=[SUM(
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP 
BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY 
a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalAggregate(group=[{0}], agg#0=[CONCAT_AGG($2)], agg#1=[$SUM0($3)])
-+- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[CONCAT_AGG($2) FILTER $4], 
agg#1=[COUNT(DISTINCT $1) FILTER $5])
+FlinkLogicalAggregate(group=[{0}], agg#0=[LISTAGG($2)], agg#1=[$SUM0($3)])
++- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[LISTAGG($2) FILTER $4], 
agg#1=[COUNT(DISTINCT $1) FILTER $5])
    +- FlinkLogicalCalc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS 
$g_0])
       +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], 
$e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
          +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 19d3208..9a1d03b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -220,11 +220,11 @@ Calc(select=[a, b])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(name=[appendSink1], fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL 
SECOND)], id1=[$0], $f2=[_UTF-16LE'*'], text=[$1])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL 
SECOND)], id1=[$0], text=[$1], $f3=[_UTF-16LE'*'])
          +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-            +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
-               +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+            +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+               +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
                   +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
                      +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                         +- LogicalJoin(condition=[true], joinType=[inner])
@@ -235,8 +235,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
 
 LogicalSink(name=[appendSink2], fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], 
$f2=[_UTF-16LE'-'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'-'])
          +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
             +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -249,8 +249,8 @@ LogicalSink(name=[appendSink3], fields=[a, b])
 +- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
    +- LogicalProject(id1=[$0], text=[$1])
       +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-         +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
-            +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+         +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+            +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
                +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
                   +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                      +- LogicalJoin(condition=[true], joinType=[inner])
@@ -269,21 +269,21 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
       +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], 
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[id2, rowtime, cnt, name, goods])
 
-GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, 
CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
+GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, 
$f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
 +- Exchange(distribution=[hash[id1]])
-   +- Calc(select=[ts, id1, _UTF-16LE'#' AS $f2, text])
+   +- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
       +- Reused(reference_id=[1])
 
 Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 
4000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 
4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text])
+      +- Calc(select=[w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3])
          +- Reused(reference_id=[2])
 
 Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], 
select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], 
select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[ts, id1, _UTF-16LE'-' AS $f2, text])
+      +- Calc(select=[ts, id1, text, _UTF-16LE'-' AS $f3])
          +- Reused(reference_id=[1])
 
 Sink(name=[appendSink3], fields=[a, b])
@@ -325,19 +325,19 @@ Sink(name=[appendSink3], fields=[a, b])
                                                        ship_strategy : FORWARD
 
                                                         : Operator
-                                                               content : 
Calc(select: (ts, id1, _UTF-16LE'#' AS $f2, text))
+                                                               content : 
Calc(select: (ts, id1, text, _UTF-16LE'#' AS $f3))
                                                                ship_strategy : 
FORWARD
 
                                                                 : Operator
-                                                                       content 
: window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, 
text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
w$rowtime, proctime('w$) AS w$proctime)
+                                                                       content 
: window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, 
$f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
w$rowtime, proctime('w$) AS w$proctime)
                                                                        
ship_strategy : HASH
 
                                                                         : 
Operator
-                                                                               
content : Calc(select: (w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text))
+                                                                               
content : Calc(select: (w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3))
                                                                                
ship_strategy : FORWARD
 
                                                                                
 : Operator
-                                                                               
        content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: 
(id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+                                                                               
        content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: 
(id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
                                                                                
        ship_strategy : HASH
 
                                                                                
         : Operator
@@ -345,11 +345,11 @@ Sink(name=[appendSink3], fields=[a, b])
                                                                                
                ship_strategy : FORWARD
 
                                                                                
                 : Operator
-                                                                               
                        content : Calc(select: (ts, id1, _UTF-16LE'-' AS $f2, 
text))
+                                                                               
                        content : Calc(select: (ts, id1, text, _UTF-16LE'-' AS 
$f3))
                                                                                
                        ship_strategy : FORWARD
 
                                                                                
                         : Operator
-                                                                               
                                content : window: (TumblingGroupWindow), 
groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+                                                                               
                                content : window: (TumblingGroupWindow), 
groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
                                                                                
                                ship_strategy : HASH
 
                                                                                
                                 : Operator
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index e9ec761..dc74eb4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -677,18 +677,18 @@ GlobalGroupAggregate(groupBy=[c], 
partialFinalType=[FINAL], select=[c, MIN_RETRA
   </TestCase>
   <TestCase 
name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=false, 
aggPhaseEnforcer=ONE_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP 
BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY 
a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT 
b) AS EXPR$2])
+GroupAggregate(groupBy=[a], select=[a, LISTAGG(c) AS EXPR$1, COUNT(DISTINCT b) 
AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
    +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -697,20 +697,20 @@ GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS 
EXPR$1, COUNT(DISTINCT b
   </TestCase>
   <TestCase 
name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=false, 
aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP 
BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY 
a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, 
concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2])
+GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG((accDelimiter$0, 
concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- LocalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS 
(accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS 
distinct$0])
+   +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(c) AS 
(accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS 
distinct$0])
       +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 
1000ms])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -718,20 +718,20 @@ GlobalGroupAggregate(groupBy=[a], select=[a, 
CONCAT_AGG((accDelimiter$0, concatA
   </TestCase>
   <TestCase 
name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=ONE_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP 
BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY 
a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
CONCAT_AGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2])
+GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
LISTAGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2])
 +- Exchange(distribution=[hash[a]])
-   +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, 
$f3, CONCAT_AGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0])
+   +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, 
$f3, LISTAGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0])
       +- Exchange(distribution=[hash[a, $f3]])
          +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, 
{a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
@@ -743,23 +743,23 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], 
select=[a, CONCAT_AGG_RETR
   </TestCase>
   <TestCase 
name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP 
BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY 
a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
CONCAT_AGG_RETRACT(concat_agg$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
LISTAGG_RETRACT(listagg$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
 +- Exchange(distribution=[hash[a]])
-   +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
CONCAT_AGG_RETRACT($f2) AS concat_agg$0, $SUM0_RETRACT($f3_0) AS sum$1, 
COUNT_RETRACT(*) AS count1$2])
-      +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], 
select=[a, $f3, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS $f2, 
COUNT(distinct$0 count$2) AS $f3_0])
+   +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
LISTAGG_RETRACT($f2) AS listagg$0, $SUM0_RETRACT($f3_0) AS sum$1, 
COUNT_RETRACT(*) AS count1$2])
+      +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], 
select=[a, $f3, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 
count$2) AS $f3_0])
          +- Exchange(distribution=[hash[a, $f3]])
-            +- LocalGroupAggregate(groupBy=[a, $f3], 
partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS 
(accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, 
DISTINCT(b) AS distinct$0])
+            +- LocalGroupAggregate(groupBy=[a, $f3], 
partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS 
(accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, 
DISTINCT(b) AS distinct$0])
                +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS 
$g_0])
                   +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], 
$e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
                      +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index a415121..ccef832 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -196,22 +196,22 @@ GlobalGroupAggregate(groupBy=[c], 
partialFinalType=[FINAL], select=[c, MIN_RETRA
   </TestCase>
   <TestCase 
name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP 
BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY 
a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT 
$2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS $f1, $SUM0(count$2) AS $f2])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
LISTAGG((accDelimiter$0, concatAcc$1)) AS $f1, $SUM0(count$2) AS $f2])
 +- Exchange(distribution=[hash[a]])
-   +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], 
finalAggGrouping=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS 
(accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2])
+   +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], 
finalAggGrouping=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS 
(accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2])
       +- Exchange(distribution=[hash[a, $f3]])
-         +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], 
select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), 
COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
+         +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], 
select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), 
COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
             +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
                +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, 
{a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
                   +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 9952e34..198fb0f 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -131,7 +131,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
     val table1 = util.tableEnv.sqlQuery(
       """
-        |SELECT id1, CONCAT_AGG('#', text)
+        |SELECT id1, LISTAGG(text, '#')
         |FROM TempTable
         |GROUP BY id1, TUMBLE(ts, INTERVAL '8' SECOND)
       """.stripMargin)
@@ -140,7 +140,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
     val table2 = util.tableEnv.sqlQuery(
       """
-        |SELECT id1, CONCAT_AGG('*', text)
+        |SELECT id1, LISTAGG(text, '*')
         |FROM TempTable
         |GROUP BY id1, HOP(ts, INTERVAL '12' SECOND, INTERVAL '6' SECOND)
       """.stripMargin)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
index 73c202d..5a5bdf5 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
@@ -307,7 +307,7 @@ class RemoveCollationTest extends TableTestBase {
         |),
         |
         |join_tb2 as (
-        | select tb1_id, concat_agg(tb2_name, ',') as tb2_names
+        | select tb1_id, LISTAGG(tb2_name, ',') as tb2_names
         | from (
         |  select v1.id as tb1_id, tb2.name as tb2_name
         |   from v1 left outer join tb2 on tb2_id = tb2.id
@@ -315,7 +315,7 @@ class RemoveCollationTest extends TableTestBase {
         |),
         |
         |join_tb3 as (
-        | select tb1_id, concat_agg(tb3_name, ',') as tb3_names
+        | select tb1_id, LISTAGG(tb3_name, ',') as tb3_names
         | from (
         |  select v2.id as tb1_id, tb3.name as tb3_name
         |   from v2 left outer join tb3 on tb3_id = tb3.id
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index b91649b..5d58dfb 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -95,7 +95,7 @@ class SplitAggregateRuleTest extends TableTestBase {
 
   @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
-    util.verifyPlan("SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable 
GROUP BY a")
+    util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable 
GROUP BY a")
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index ce9e042..9de3886 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -297,7 +297,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
     val table2 = util.tableEnv.sqlQuery(
       """
         |SELECT id1,
-        |    CONCAT_AGG('#', text) as text,
+        |    LISTAGG(text, '#') as text,
         |    TUMBLE_ROWTIME(ts, INTERVAL '6' SECOND) as ts
         |FROM TempTable1
         |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1
@@ -307,7 +307,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   val table3 = util.tableEnv.sqlQuery(
       """
         |SELECT id1,
-        |    CONCAT_AGG('*', text)
+        |    LISTAGG(text, '*')
         |FROM TempTable2
         |GROUP BY HOP(ts, INTERVAL '12' SECOND, INTERVAL '4' SECOND), id1
       """.stripMargin)
@@ -317,7 +317,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
     val table4 = util.tableEnv.sqlQuery(
       """
         |SELECT id1,
-        |    CONCAT_AGG('-', text)
+        |    LISTAGG(text, '-')
         |FROM TempTable1
         |GROUP BY TUMBLE(ts, INTERVAL '9' SECOND), id1
       """.stripMargin)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 7a30103..8e36305 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -103,7 +103,7 @@ class DistinctAggregateTest(
 
   @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
-    util.verifyPlan("SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable 
GROUP BY a")
+    util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable 
GROUP BY a")
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
index 5ce79fc..99ed303 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
@@ -22,14 +22,16 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.utils.{TableFunc0, TableTestBase}
+import org.apache.flink.types.Row
 
+import org.junit.Assert.{assertTrue, fail}
 import org.junit.Test
 
 class AggregateValidationTest extends TableTestBase {
+  private val util = scalaStreamTestUtil()
 
   @Test(expected = classOf[ValidationException])
   def testGroupingOnNonExistentField(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     val ds = table
@@ -40,7 +42,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testGroupingInvalidSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -51,7 +52,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidAggregationInSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -63,7 +63,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWindowPropertiesInSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -75,7 +74,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[RuntimeException])
   def testTableFunctionInSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     util.addFunction("func", new TableFunc0)
@@ -90,7 +88,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidScalarFunctionInAggregate(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -102,7 +99,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidTableFunctionInAggregate(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     util.addFunction("func", new TableFunc0)
@@ -115,13 +111,52 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[RuntimeException])
   def testMultipleAggregateExpressionInAggregate(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
-
     util.addFunction("func", new TableFunc0)
+    val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
     table
       .groupBy('a)
       // must fail. Only one AggregateFunction can be used in aggregate
       .aggregate("sum(c), count(b)")
   }
+
+  @Test
+  def testIllegalArgumentForListAgg(): Unit = {
+    util.addTableSource[(Long, Int, String, String)]("T", 'a, 'b, 'c, 'd)
+    // If there are two parameters, second one must be character literal.
+    expectExceptionThrown(
+      "SELECT listagg(c, d) FROM T GROUP BY a",
+    "Supported form(s): 'LISTAGG(<CHARACTER>)'\n'LISTAGG(<CHARACTER>, 
<CHARACTER_LITERAL>)",
+      classOf[ValidationException])
+  }
+
+  @Test
+  def testIllegalArgumentForListAgg1(): Unit = {
+    util.addTableSource[(Long, Int, String, String)]("T", 'a, 'b, 'c, 'd)
+    // If there are two parameters, second one must be character literal.
+    expectExceptionThrown(
+      "SELECT LISTAGG(c, 1) FROM T GROUP BY a",
+      "Supported form(s): 'LISTAGG(<CHARACTER>)'\n'LISTAGG(<CHARACTER>, 
<CHARACTER_LITERAL>)",
+      classOf[ValidationException])
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  private def expectExceptionThrown(
+      sql: String,
+      keywords: String,
+      clazz: Class[_ <: Throwable] = classOf[ValidationException])
+  : Unit = {
+    try {
+      util.tableEnv.toAppendStream[Row](util.tableEnv.sqlQuery(sql))
+      fail(s"Expected a $clazz, but no exception is thrown.")
+    } catch {
+      case e if e.getClass == clazz =>
+        if (keywords != null) {
+          assertTrue(
+            s"The exception message '${e.getMessage}' doesn't contain keyword 
'$keywords'",
+            e.getMessage.contains(keywords))
+        }
+      case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but 
is $e.")
+    }
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
index f64b0b6..eed0d3e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
@@ -138,27 +138,26 @@ class SortAggITCase
   }
 
   // NOTE: Spark has agg functions collect_list(), collect_set().
-  //       instead, we'll test concat_agg() here
-  @Ignore
+  //       instead, we'll test LISTAGG() here
   @Test
   def testConcatAgg(): Unit = {
     checkResult(
-      "SELECT concat_agg('-', c), concat_agg(c) FROM SmallTable3",
+      "SELECT LISTAGG(c, '-'), LISTAGG(c) FROM SmallTable3",
       Seq(
-        row("Hi-Hello-Hello world", "Hi\nHello\nHello world")
+        row("Hi-Hello-Hello world", "Hi,Hello,Hello world")
       )
     )
 
     // EmptyTable5
     checkResult(
-      "SELECT concat_agg('-', g), concat_agg(g) FROM EmptyTable5",
+      "SELECT LISTAGG(g, '-'), LISTAGG(g) FROM EmptyTable5",
       Seq(
         row(null, null)
       )
     )
 
     checkResult(
-      "SELECT concat_agg('-', c), concat_agg(c) FROM AllNullTable3",
+      "SELECT LISTAGG(c, '-'), LISTAGG(c) FROM AllNullTable3",
       Seq(
         row(null, null)
       )
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 318487fe..41561a3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
-import 
org.apache.flink.table.planner.functions.aggfunctions.{ConcatWithRetractAggFunction,
 ConcatWsWithRetractAggFunction}
+import 
org.apache.flink.table.planner.functions.aggfunctions.{ListAggWithRetractAggFunction,
 ListAggWsWithRetractAggFunction}
 import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSumAggFunction
 import 
org.apache.flink.table.planner.runtime.batch.sql.agg.{MyPojoAggFunction, 
VarArgsAggFunction}
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithAggTestBase.AggMode
@@ -605,7 +605,7 @@ class AggregateITCase(
 
     val sqlQuery =
       s"""
-         |SELECT len, concat_agg('#', content) FROM T GROUP BY len
+         |SELECT len, listagg(content, '#') FROM T GROUP BY len
        """.stripMargin
 
     val sink = new TestingRetractSink
@@ -629,7 +629,7 @@ class AggregateITCase(
 
     val sqlQuery =
       s"""
-         |SELECT len, concat_agg(content) FROM T GROUP BY len
+         |SELECT len, listagg(content) FROM T GROUP BY len
        """.stripMargin
 
     val sink = new TestingRetractSink
@@ -906,7 +906,7 @@ class AggregateITCase(
       """
         |SELECT b, min(c), max(c)
         |FROM (
-        | SELECT a, b, concat_agg(c) as c
+        | SELECT a, b, listagg(c) as c
         | FROM T
         | GROUP BY a, b)
         |GROUP BY b
@@ -1061,15 +1061,15 @@ class AggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  /** Test CONCAT_AGG **/
+  /** Test LISTAGG **/
   @Test
   def testConcatAgg(): Unit = {
-    tEnv.registerFunction("concat_agg_retract", new 
ConcatWithRetractAggFunction)
-    tEnv.registerFunction("concat_agg_ws_retract", new 
ConcatWsWithRetractAggFunction)
+    tEnv.registerFunction("listagg_retract", new ListAggWithRetractAggFunction)
+    tEnv.registerFunction("listagg_ws_retract", new 
ListAggWsWithRetractAggFunction)
     val sqlQuery =
       s"""
          |SELECT
-         |  concat_agg(c), concat_agg('-', c), concat_agg_retract(c), 
concat_agg_ws_retract('+', c)
+         |  listagg(c), listagg(c, '-'), listagg_retract(c), 
listagg_ws_retract(c, '+')
          |FROM MyTable
          |GROUP BY c
          |""".stripMargin
@@ -1085,8 +1085,8 @@ class AggregateITCase(
     val sink = new TestingRetractSink
     tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink)
     env.execute()
-    val expected = 
List("Hi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi,Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi," +
-      "Hi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi,Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi")
+    val expected = 
List("Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi," +
+      "Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
index 659f747..f2c29c6 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
@@ -125,7 +125,7 @@ class AggregateITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
 //  @Test
 //  def testSimpleLogical(): Unit = {
 //    val t = failingDataSource(smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
-//      .select('c.firstValue, 'c.lastValue, 'c.concat_agg("#"))
+//      .select('c.firstValue, 'c.lastValue, 'c.LISTAGG("#"))
 //
 //    val sink = new TestingRetractSink()
 //    t.toRetractStream[Row].addSink(sink)

Reply via email to