This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 77215aaf6ca [FLINK-35021] AggregateQueryOperations produces wrong 
asSerializableString representation (#24624)
77215aaf6ca is described below

commit 77215aaf6ca7ccbff7bd3752e59068ac9956d549
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Apr 5 14:17:52 2024 +0200

    [FLINK-35021] AggregateQueryOperations produces wrong asSerializableString 
representation (#24624)
---
 .../table/operations/AggregateQueryOperation.java  | 17 +++++++++++---
 .../table/api/QueryOperationSqlExecutionTest.java  |  1 +
 .../api/QueryOperationSqlSerializationTest.java    |  3 ++-
 .../table/api/QueryOperationTestPrograms.java      | 27 ++++++++++++++++++++++
 4 files changed, 44 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
index 22baeffef6d..c1e6d3a5479 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/AggregateQueryOperation.java
@@ -76,15 +76,26 @@ public class AggregateQueryOperation implements 
QueryOperation {
 
     @Override
     public String asSerializableString() {
+        final String groupingExprs = getGroupingExprs();
         return String.format(
                 "SELECT %s FROM (%s\n)\nGROUP BY %s",
                 Stream.concat(groupingExpressions.stream(), 
aggregateExpressions.stream())
                         .map(ResolvedExpression::asSerializableString)
                         .collect(Collectors.joining(", ")),
                 OperationUtils.indent(child.asSerializableString()),
-                groupingExpressions.stream()
-                        .map(ResolvedExpression::asSerializableString)
-                        .collect(Collectors.joining(", ")));
+                groupingExprs);
+    }
+
+    private String getGroupingExprs() {
+        if (groupingExpressions.isEmpty()) {
+            return "1";
+        } else {
+            final String groupingExprs =
+                    groupingExpressions.stream()
+                            .map(ResolvedExpression::asSerializableString)
+                            .collect(Collectors.joining(", "));
+            return groupingExprs;
+        }
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
index f686ba1f283..300120eb644 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java
@@ -60,6 +60,7 @@ public class QueryOperationSqlExecutionTest implements 
TableTestProgramRunner {
                 QueryOperationTestPrograms.VALUES_QUERY_OPERATION,
                 QueryOperationTestPrograms.FILTER_QUERY_OPERATION,
                 QueryOperationTestPrograms.AGGREGATE_QUERY_OPERATION,
+                
QueryOperationTestPrograms.AGGREGATE_NO_GROUP_BY_QUERY_OPERATION,
                 QueryOperationTestPrograms.DISTINCT_QUERY_OPERATION,
                 QueryOperationTestPrograms.JOIN_QUERY_OPERATION,
                 QueryOperationTestPrograms.ORDER_BY_QUERY_OPERATION,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
index 1c9251608a3..6acaf2c7a1d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
@@ -53,6 +53,7 @@ public class QueryOperationSqlSerializationTest implements 
TableTestProgramRunne
                 QueryOperationTestPrograms.VALUES_QUERY_OPERATION,
                 QueryOperationTestPrograms.FILTER_QUERY_OPERATION,
                 QueryOperationTestPrograms.AGGREGATE_QUERY_OPERATION,
+                
QueryOperationTestPrograms.AGGREGATE_NO_GROUP_BY_QUERY_OPERATION,
                 QueryOperationTestPrograms.DISTINCT_QUERY_OPERATION,
                 QueryOperationTestPrograms.JOIN_QUERY_OPERATION,
                 QueryOperationTestPrograms.ORDER_BY_QUERY_OPERATION,
@@ -114,7 +115,7 @@ public class QueryOperationSqlSerializationTest implements 
TableTestProgramRunne
                 (StreamGraph)
                         env.generatePipelineFromQueryOperation(queryOperation, 
transformations);
 
-        assertThat(sqlStep.sql).isEqualTo(streamGraph.getJobName());
+        assertThat(streamGraph.getJobName()).isEqualTo(sqlStep.sql);
     }
 
     private static TableEnvironment setupEnv(TableTestProgram program) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
index a77b4f481ff..b4523da6980 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
@@ -147,6 +147,33 @@ public class QueryOperationTestPrograms {
                                     + ")")
                     .build();
 
+    static final TableTestProgram AGGREGATE_NO_GROUP_BY_QUERY_OPERATION =
+            TableTestProgram.of(
+                            "aggregate-query-no-group-by-operation", "verifies 
sql serialization")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("s")
+                                    .addSchema("a bigint", "b string")
+                                    .producedValues(
+                                            Row.of(10L, "apple"),
+                                            Row.of(20L, "apple"),
+                                            Row.of(5L, "pear"),
+                                            Row.of(15L, "pear"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("b bigint")
+                                    .consumedValues(Row.of(50L))
+                                    .build())
+                    .runTableApi(t -> t.from("s").select($("a").sum()), "sink")
+                    .runSql(
+                            "SELECT `EXPR$0` FROM (\n"
+                                    + "    SELECT (SUM(`a`)) AS `EXPR$0` FROM 
(\n"
+                                    + "        SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`\n"
+                                    + "    )\n"
+                                    + "    GROUP BY 1\n"
+                                    + ")")
+                    .build();
+
     static final TableTestProgram WINDOW_AGGREGATE_QUERY_OPERATION =
             TableTestProgram.of("window-aggregate-query-operation", "verifies 
sql serialization")
                     .setupTableSource(

Reply via email to