This is an automated email from the ASF dual-hosted git repository.
gustavodemorais pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a46a571a701 [FLINK-39897][table] Fix SQL serialization when using
limit without order
a46a571a701 is described below
commit a46a571a701b12f733b0e30645b98e8211d88d0b
Author: Mika Naylor <[email protected]>
AuthorDate: Thu Jun 11 11:00:39 2026 +0100
[FLINK-39897][table] Fix SQL serialization when using limit without order
This closes #28367.
---
.../flink/table/operations/SortQueryOperation.java | 25 +++++++++++-----------
.../table/api/QueryOperationSqlSemanticTest.java | 1 +
.../api/QueryOperationSqlSerializationTest.java | 1 +
.../table/api/QueryOperationTestPrograms.java | 20 +++++++++++++++++
4 files changed, 34 insertions(+), 13 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
index 717ab4f59c8..1e4b96f3d73 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SortQueryOperation.java
@@ -92,23 +92,15 @@ public class SortQueryOperation implements QueryOperation {
final StringBuilder s =
new StringBuilder(
String.format(
- "SELECT %s FROM (%s\n) %s ORDER BY %s",
+ "SELECT %s FROM (%s\n) %s",
OperationUtils.formatSelectColumns(
getResolvedSchema(), INPUT_ALIAS),
OperationUtils.indent(child.asSerializableString(sqlFactory)),
- INPUT_ALIAS,
- order.stream()
- .map(
- expr ->
-
OperationExpressionsUtils
-
.scopeReferencesWithAlias(
-
INPUT_ALIAS, expr))
- .map(
- resolvedExpression ->
-
resolvedExpression.asSerializableString(
- sqlFactory))
- .collect(Collectors.joining(", "))));
+ INPUT_ALIAS));
+ if (!order.isEmpty()) {
+ s.append(" ORDER BY ").append(serializeOrder(sqlFactory));
+ }
if (offset >= 0) {
s.append(" OFFSET ");
s.append(offset);
@@ -122,6 +114,13 @@ public class SortQueryOperation implements QueryOperation {
return s.toString();
}
+ private String serializeOrder(SqlFactory sqlFactory) {
+ return order.stream()
+ .map(expr ->
OperationExpressionsUtils.scopeReferencesWithAlias(INPUT_ALIAS, expr))
+ .map(expr -> expr.asSerializableString(sqlFactory))
+ .collect(Collectors.joining(", "));
+ }
+
@Override
public List<QueryOperation> getChildren() {
return Collections.singletonList(child);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSemanticTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSemanticTest.java
index 8483fdff25f..354ae8cd08f 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSemanticTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSemanticTest.java
@@ -44,6 +44,7 @@ public class QueryOperationSqlSemanticTest extends
SemanticTestBase {
QueryOperationTestPrograms.DISTINCT_QUERY_OPERATION,
QueryOperationTestPrograms.JOIN_QUERY_OPERATION,
QueryOperationTestPrograms.ORDER_BY_QUERY_OPERATION,
+ QueryOperationTestPrograms.LIMIT_QUERY_OPERATION,
QueryOperationTestPrograms.WINDOW_AGGREGATE_QUERY_OPERATION,
QueryOperationTestPrograms.UNION_ALL_QUERY_OPERATION,
QueryOperationTestPrograms.LATERAL_JOIN_QUERY_OPERATION,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
index d7a512327d6..e9525eae79e 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
@@ -59,6 +59,7 @@ public class QueryOperationSqlSerializationTest implements
TableTestProgramRunne
QueryOperationTestPrograms.DISTINCT_QUERY_OPERATION,
QueryOperationTestPrograms.JOIN_QUERY_OPERATION,
QueryOperationTestPrograms.ORDER_BY_QUERY_OPERATION,
+ QueryOperationTestPrograms.LIMIT_QUERY_OPERATION,
QueryOperationTestPrograms.WINDOW_AGGREGATE_QUERY_OPERATION,
QueryOperationTestPrograms.UNION_ALL_QUERY_OPERATION,
QueryOperationTestPrograms.LATERAL_JOIN_QUERY_OPERATION,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
index 1c59e7d1400..03b8a73ed98 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java
@@ -428,6 +428,26 @@ public class QueryOperationTestPrograms {
+ " OFFSET 1 ROWS FETCH NEXT 2 ROWS ONLY")
.build();
+ static final TableTestProgram LIMIT_QUERY_OPERATION =
+ TableTestProgram.of("limit-query-operation", "verifies sql
serialization")
+ .setupTableSource(
+ SourceTestStep.newBuilder("s")
+ .addSchema("a bigint", "b string")
+ .producedValues(Row.of(1L, "a"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("a bigint", "b string")
+ .consumedValues(Row.of(1L, "a"))
+ .build())
+ .runTableApi(t -> t.from("s").limit(1), "sink")
+ .runSql(
+ "SELECT `$$T_SORT`.`a`, `$$T_SORT`.`b` FROM (\n"
+ + " SELECT `$$T_SOURCE`.`a`,
`$$T_SOURCE`.`b` FROM `default_catalog`"
+ + ".`default_database`.`s` $$T_SOURCE\n"
+ + ") $$T_SORT OFFSET 0 ROWS FETCH NEXT 1
ROWS ONLY")
+ .build();
+
static final TableTestProgram SQL_QUERY_OPERATION =
TableTestProgram.of("sql-query-operation", "verifies sql
serialization")
.setupTableSource(