This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 77215aaf6ca [FLINK-35021] AggregateQueryOperations produces wrong
asSerializableString representation (#24624)
77215aaf6ca is described below
commit 77215aaf6ca7ccbff7bd3752e59068ac9956d549
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Apr 5 14:17:52 2024 +0200
[FLINK-35021] AggregateQueryOperations produces wrong asSerializableString
representation (#24624)
---
.../table/operations/AggregateQueryOperation.java | 17 +++++++++++---
.../table/api/QueryOperationSqlExecutionTest.java | 1 +
.../api/QueryOperationSqlSerializationTest.java | 3 ++-
.../table/api/QueryOperationTestPrograms.java | 27 ++++++++++++++++++++++
4 files changed, 44 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
index 22baeffef6d..c1e6d3a5479 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
@@ -76,15 +76,26 @@ public class AggregateQueryOperation implements
QueryOperation {
@Override
public String asSerializableString() {
+ final String groupingExprs = getGroupingExprs();
return String.format(
"SELECT %s FROM (%s\n)\nGROUP BY %s",
Stream.concat(groupingExpressions.stream(),
aggregateExpressions.stream())
.map(ResolvedExpression::asSerializableString)
.collect(Collectors.joining(", ")),
OperationUtils.indent(child.asSerializableString()),
- groupingExpressions.stream()
- .map(ResolvedExpression::asSerializableString)
- .collect(Collectors.joining(", ")));
+ groupingExprs);
+ }
+
+ private String getGroupingExprs() {
+ if (groupingExpressions.isEmpty()) {
+ return "1";
+ } else {
+ final String groupingExprs =
+ groupingExpressions.stream()
+ .map(ResolvedExpression::asSerializableString)
+ .collect(Collectors.joining(", "));
+ return groupingExprs;
+ }
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
index f686ba1f283..300120eb644 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
@@ -60,6 +60,7 @@ public class QueryOperationSqlExecutionTest implements
TableTestProgramRunner {
QueryOperationTestPrograms.VALUES_QUERY_OPERATION,
QueryOperationTestPrograms.FILTER_QUERY_OPERATION,
QueryOperationTestPrograms.AGGREGATE_QUERY_OPERATION,
+
QueryOperationTestPrograms.AGGREGATE_NO_GROUP_BY_QUERY_OPERATION,
QueryOperationTestPrograms.DISTINCT_QUERY_OPERATION,
QueryOperationTestPrograms.JOIN_QUERY_OPERATION,
QueryOperationTestPrograms.ORDER_BY_QUERY_OPERATION,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
index 1c9251608a3..6acaf2c7a1d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
@@ -53,6 +53,7 @@ public class QueryOperationSqlSerializationTest implements
TableTestProgramRunne
QueryOperationTestPrograms.VALUES_QUERY_OPERATION,
QueryOperationTestPrograms.FILTER_QUERY_OPERATION,
QueryOperationTestPrograms.AGGREGATE_QUERY_OPERATION,
+
QueryOperationTestPrograms.AGGREGATE_NO_GROUP_BY_QUERY_OPERATION,
QueryOperationTestPrograms.DISTINCT_QUERY_OPERATION,
QueryOperationTestPrograms.JOIN_QUERY_OPERATION,
QueryOperationTestPrograms.ORDER_BY_QUERY_OPERATION,
@@ -114,7 +115,7 @@ public class QueryOperationSqlSerializationTest implements
TableTestProgramRunne
(StreamGraph)
env.generatePipelineFromQueryOperation(queryOperation,
transformations);
- assertThat(sqlStep.sql).isEqualTo(streamGraph.getJobName());
+ assertThat(streamGraph.getJobName()).isEqualTo(sqlStep.sql);
}
private static TableEnvironment setupEnv(TableTestProgram program) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
index a77b4f481ff..b4523da6980 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
@@ -147,6 +147,33 @@ public class QueryOperationTestPrograms {
+ ")")
.build();
+ static final TableTestProgram AGGREGATE_NO_GROUP_BY_QUERY_OPERATION =
+ TableTestProgram.of(
+ "aggregate-query-no-group-by-operation", "verifies
sql serialization")
+ .setupTableSource(
+ SourceTestStep.newBuilder("s")
+ .addSchema("a bigint", "b string")
+ .producedValues(
+ Row.of(10L, "apple"),
+ Row.of(20L, "apple"),
+ Row.of(5L, "pear"),
+ Row.of(15L, "pear"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("b bigint")
+ .consumedValues(Row.of(50L))
+ .build())
+ .runTableApi(t -> t.from("s").select($("a").sum()), "sink")
+ .runSql(
+ "SELECT `EXPR$0` FROM (\n"
+ + " SELECT (SUM(`a`)) AS `EXPR$0` FROM
(\n"
+ + " SELECT `a`, `b` FROM
`default_catalog`.`default_database`.`s`\n"
+ + " )\n"
+ + " GROUP BY 1\n"
+ + ")")
+ .build();
+
static final TableTestProgram WINDOW_AGGREGATE_QUERY_OPERATION =
TableTestProgram.of("window-aggregate-query-operation", "verifies
sql serialization")
.setupTableSource(