This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 556b2e95db5 [FLINK-27006][table] Support SingleValueAggFunction for
CHAR data type
556b2e95db5 is described below
commit 556b2e95db5824906bc969f7b02c0fb79d41d5ee
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Jun 17 14:20:38 2022 +0800
[FLINK-27006][table] Support SingleValueAggFunction for CHAR data type
This closes #19338
---
.../functions/aggfunctions/SingleValueAggFunction.java | 18 ++++++++++++++++++
.../table/planner/plan/utils/AggFunctionFactory.scala | 3 +++
.../planner/runtime/stream/sql/AggregateITCase.scala | 9 +++++++++
3 files changed, 30 insertions(+)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
index 5a3040fdf6b..4463250d366 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.TimeType;
@@ -226,6 +227,23 @@ public abstract class SingleValueAggFunction extends
DeclarativeAggregateFunctio
}
}
+ /** Built-in char single value aggregate function. */
+ public static final class CharSingleValueAggFunction extends
SingleValueAggFunction {
+
+ private static final long serialVersionUID = 320495723666949978L;
+
+ private final CharType type;
+
+ public CharSingleValueAggFunction(CharType type) {
+ this.type = type;
+ }
+
+ @Override
+ public DataType getResultType() {
+ return DataTypes.CHAR(type.getLength());
+ }
+ }
+
/** Built-in string single value aggregate function. */
public static final class StringSingleValueAggFunction extends
SingleValueAggFunction {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 809d3e89336..608bed8fe6e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -429,6 +429,9 @@ class AggFunctionFactory(
new DoubleSingleValueAggFunction
case BOOLEAN =>
new BooleanSingleValueAggFunction
+ case CHAR =>
+ val d = argTypes(0).asInstanceOf[CharType]
+ new CharSingleValueAggFunction(d)
case VARCHAR =>
new StringSingleValueAggFunction
case DATE =>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 3f1c8b6bd61..f1035bad04e 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -898,6 +898,15 @@ class AggregateITCase(aggMode: AggMode, miniBatch:
MiniBatchMode, backend: State
val expected = List("1,1,A", "2,2,B", "3,2,B", "4,3,C", "5,3,C")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
+
+ // test single value for char type
+ val tc = tEnv.fromValues(DataTypes.ROW(DataTypes.FIELD("a",
DataTypes.CHAR(3))), Row.of("AA"))
+ tEnv.registerTable("tc", tc)
+ val tr = tEnv.sqlQuery("SELECT * FROM tc WHERE tc.a = (SELECT a FROM tc)")
+ val sink1 = new TestingRetractSink
+ tr.toRetractStream[Row].addSink(sink1).setParallelism(1)
+ env.execute()
+ assertEquals(List("AA "), sink1.getRetractResults.sorted)
}
@Test