This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 556b2e95db5 [FLINK-27006][table] Support SingleValueAggFunction for 
CHAR data type
556b2e95db5 is described below

commit 556b2e95db5824906bc969f7b02c0fb79d41d5ee
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Jun 17 14:20:38 2022 +0800

    [FLINK-27006][table] Support SingleValueAggFunction for CHAR data type
    
    This closes #19338
---
 .../functions/aggfunctions/SingleValueAggFunction.java | 18 ++++++++++++++++++
 .../table/planner/plan/utils/AggFunctionFactory.scala  |  3 +++
 .../planner/runtime/stream/sql/AggregateITCase.scala   |  9 +++++++++
 3 files changed, 30 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
index 5a3040fdf6b..4463250d366 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.TimeType;
@@ -226,6 +227,23 @@ public abstract class SingleValueAggFunction extends 
DeclarativeAggregateFunctio
         }
     }
 
+    /** Built-in char single value aggregate function. */
+    public static final class CharSingleValueAggFunction extends 
SingleValueAggFunction {
+
+        private static final long serialVersionUID = 320495723666949978L;
+
+        private final CharType type;
+
+        public CharSingleValueAggFunction(CharType type) {
+            this.type = type;
+        }
+
+        @Override
+        public DataType getResultType() {
+            return DataTypes.CHAR(type.getLength());
+        }
+    }
+
     /** Built-in string single value aggregate function. */
     public static final class StringSingleValueAggFunction extends 
SingleValueAggFunction {
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 809d3e89336..608bed8fe6e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -429,6 +429,9 @@ class AggFunctionFactory(
         new DoubleSingleValueAggFunction
       case BOOLEAN =>
         new BooleanSingleValueAggFunction
+      case CHAR =>
+        val d = argTypes(0).asInstanceOf[CharType]
+        new CharSingleValueAggFunction(d)
       case VARCHAR =>
         new StringSingleValueAggFunction
       case DATE =>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 3f1c8b6bd61..f1035bad04e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -898,6 +898,15 @@ class AggregateITCase(aggMode: AggMode, miniBatch: 
MiniBatchMode, backend: State
 
     val expected = List("1,1,A", "2,2,B", "3,2,B", "4,3,C", "5,3,C")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
+
+    // test single value for char type
+    val tc = tEnv.fromValues(DataTypes.ROW(DataTypes.FIELD("a", 
DataTypes.CHAR(3))), Row.of("AA"))
+    tEnv.registerTable("tc", tc)
+    val tr = tEnv.sqlQuery("SELECT * FROM tc WHERE tc.a = (SELECT a FROM tc)")
+    val sink1 = new TestingRetractSink
+    tr.toRetractStream[Row].addSink(sink1).setParallelism(1)
+    env.execute()
+    assertEquals(List("AA "), sink1.getRetractResults.sorted)
   }
 
   @Test

Reply via email to