This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cc348d48096 [FLINK-39801][table] Skip serializing empty
partition/order keys for PTF table args
cc348d48096 is described below
commit cc348d480963ef619b3f55832e8ed2f50944ca88
Author: Ramin Gharib <[email protected]>
AuthorDate: Mon Jun 1 13:24:36 2026 +0200
[FLINK-39801][table] Skip serializing empty partition/order keys for PTF
table args
---
.../nodes/exec/serde/RexNodeJsonDeserializer.java | 56 ++++++++++++++--------
.../nodes/exec/serde/RexNodeJsonSerializer.java | 25 ++++++----
.../plan/from-changelog-retract-restore.json | 3 --
.../plan/process-late-events-restore.json | 2 -
.../plan/process-map-state-restore.json | 1 -
.../plan/process-multi-input-restore.json | 2 -
.../plan/process-multi-state-restore.json | 1 -
.../process-partitioned-named-timers-restore.json | 1 -
.../plan/process-row-semantic-table-restore.json | 2 -
.../process-updating-output-upsert-restore.json | 1 -
.../plan/to-changelog-retract-restore.json | 3 --
11 files changed, 51 insertions(+), 46 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index 0c6f3ace423..2c063aade64 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -69,6 +69,7 @@ import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.function.Function;
import static com.google.common.collect.ImmutableRangeSet.Builder;
import static com.google.common.collect.ImmutableRangeSet.builder;
@@ -329,30 +330,43 @@ final class RexNodeJsonDeserializer extends
StdDeserializer<RexNode> {
final int inputIndex =
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
- final JsonNode partitionKeysNode =
jsonNode.required(FIELD_NAME_PARTITION_KEYS);
- final int[] partitionKeys = new int[partitionKeysNode.size()];
- for (int i = 0; i < partitionKeysNode.size(); ++i) {
- partitionKeys[i] = partitionKeysNode.get(i).asInt();
- }
+ final int[] partitionKeys = deserializeIntArray(jsonNode,
FIELD_NAME_PARTITION_KEYS);
+ final int[] orderKeys = deserializeIntArray(jsonNode,
FIELD_NAME_ORDER_KEYS);
+ final SortOrder[] sortOrders =
+ deserializeArray(
+ jsonNode,
+ FIELD_NAME_ORDER_DIRECTIONS,
+ node -> SortOrder.valueOf(node.asText()),
+ new SortOrder[0]);
- final JsonNode orderKeysNode =
jsonNode.required(FIELD_NAME_ORDER_KEYS);
- final int[] orderKeys = new int[orderKeysNode.size()];
- for (int i = 0; i < orderKeysNode.size(); ++i) {
- orderKeys[i] = orderKeysNode.get(i).asInt();
- }
+ return new RexTableArgCall(callType, inputIndex, partitionKeys,
orderKeys, sortOrders);
+ }
- final JsonNode orderDirectionsNode =
jsonNode.get(FIELD_NAME_ORDER_DIRECTIONS);
- final SortOrder[] order;
- if (orderDirectionsNode != null && !orderDirectionsNode.isEmpty()) {
- order = new SortOrder[orderDirectionsNode.size()];
- for (int i = 0; i < orderDirectionsNode.size(); ++i) {
- order[i] =
SortOrder.valueOf(orderDirectionsNode.get(i).asText());
- }
- } else {
- order = new SortOrder[0];
- }
+ private static int[] deserializeIntArray(JsonNode jsonNode, String
fieldName) {
+ return deserializeListOrEmpty(jsonNode, fieldName,
JsonNode::asInt).stream()
+ .mapToInt(Integer::intValue)
+ .toArray();
+ }
+
+ private static <T> T[] deserializeArray(
+ JsonNode jsonNode,
+ String fieldName,
+ Function<JsonNode, T> elementDeserializer,
+ T[] emptyArray) {
+ return deserializeListOrEmpty(jsonNode, fieldName,
elementDeserializer).toArray(emptyArray);
+ }
- return new RexTableArgCall(callType, inputIndex, partitionKeys,
orderKeys, order);
+ private static <T> List<T> deserializeListOrEmpty(
+ JsonNode jsonNode, String fieldName, Function<JsonNode, T>
elementDeserializer) {
+ final JsonNode arrayNode = jsonNode.get(fieldName);
+ if (arrayNode == null || arrayNode.isEmpty()) {
+ return List.of();
+ }
+ final List<T> result = new ArrayList<>(arrayNode.size());
+ for (final JsonNode element : arrayNode) {
+ result.add(elementDeserializer.apply(element));
+ }
+ return result;
}
private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext
serdeContext)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index a342dc66913..6495778609c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -340,16 +340,23 @@ final class RexNodeJsonSerializer extends
StdSerializer<RexNode> {
gen.writeStartObject();
gen.writeStringField(FIELD_NAME_KIND, KIND_TABLE_ARG_CALL);
gen.writeNumberField(FIELD_NAME_INPUT_INDEX,
tableArgCall.getInputIndex());
- gen.writeFieldName(FIELD_NAME_PARTITION_KEYS);
- gen.writeArray(tableArgCall.getPartitionKeys(), 0,
tableArgCall.getPartitionKeys().length);
- gen.writeFieldName(FIELD_NAME_ORDER_KEYS);
- gen.writeArray(tableArgCall.getOrderKeys(), 0,
tableArgCall.getOrderKeys().length);
- gen.writeFieldName(FIELD_NAME_ORDER_DIRECTIONS);
- gen.writeStartArray();
- for (SortOrder order : tableArgCall.getSortOrder()) {
- gen.writeString(order.name());
+ if (tableArgCall.getPartitionKeys().length > 0) {
+ gen.writeFieldName(FIELD_NAME_PARTITION_KEYS);
+ gen.writeArray(
+ tableArgCall.getPartitionKeys(), 0,
tableArgCall.getPartitionKeys().length);
+ }
+ if (tableArgCall.getOrderKeys().length > 0) {
+ gen.writeFieldName(FIELD_NAME_ORDER_KEYS);
+ gen.writeArray(tableArgCall.getOrderKeys(), 0,
tableArgCall.getOrderKeys().length);
+ if (tableArgCall.getSortOrder().length > 0) {
+ gen.writeFieldName(FIELD_NAME_ORDER_DIRECTIONS);
+ gen.writeStartArray();
+ for (SortOrder order : tableArgCall.getSortOrder()) {
+ gen.writeString(order.name());
+ }
+ gen.writeEndArray();
+ }
}
- gen.writeEndArray();
serializerProvider.defaultSerializeField(FIELD_NAME_TYPE,
tableArgCall.getType(), gen);
gen.writeEndObject();
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
index b795986b6d6..c1004042c1b 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
@@ -43,9 +43,6 @@
"operands" : [ {
"kind" : "TABLE_ARG_CALL",
"inputIndex" : 0,
- "partitionKeys" : [ ],
- "orderKeys" : [ ],
- "orderDirections" : [ ],
"type" : "ROW<`id` INT, `op` VARCHAR(2147483647), `name`
VARCHAR(2147483647)> NOT NULL"
}, {
"kind" : "CALL",
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
index 8d22a24a156..4b029426df9 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
@@ -180,8 +180,6 @@
"kind" : "TABLE_ARG_CALL",
"inputIndex" : 0,
"partitionKeys" : [ 0 ],
- "orderKeys" : [ ],
- "orderDirections" : [ ],
"type" : {
"type" : "ROW",
"nullable" : false,
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-map-state-restore/plan/process-map-state-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-map-state-restore/plan/process-map-state-restore.json
index a799c8a0179..64b93fb20b5 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-map-state-restore/plan/process-map-state-restore.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-map-state-restore/plan/process-map-state-restore.json
@@ -54,7 +54,6 @@
"kind" : "TABLE_ARG_CALL",
"inputIndex" : 0,
"partitionKeys" : [ 0 ],
- "orderKeys" : [ ],
"type" : "ROW<`name` VARCHAR(2147483647), `score` INT> NOT NULL"
}, {
"kind" : "CALL",
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-input-restore/plan/process-multi-input-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-input-restore/plan/process-multi-input-restore.json
index c5eb1b79fb2..6a7c8eae544 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-input-restore/plan/process-multi-input-restore.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-input-restore/plan/process-multi-input-restore.json
@@ -94,13 +94,11 @@
"kind" : "TABLE_ARG_CALL",
"inputIndex" : 0,
"partitionKeys" : [ 0 ],
- "orderKeys" : [ ],
"type" : "ROW<`name` VARCHAR(2147483647), `score` INT> NOT NULL"
}, {
"kind" : "TABLE_ARG_CALL",
"inputIndex" : 1,
"partitionKeys" : [ 0 ],
- "orderKeys" : [ ],
"type" : "ROW<`name` VARCHAR(2147483647), `city` VARCHAR(2147483647)>
NOT NULL"
}, {
"kind" : "CALL",
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-state-restore/plan/process-multi-state-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-state-restore/plan/process-multi-state-restore.json
index d8482f85423..147160be7fc 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-state-restore/plan/process-multi-state-restore.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-multi-state-restore/plan/process-multi-state-restore.json
@@ -54,7 +54,6 @@
"kind" : "TABLE_ARG_CALL",
"inputIndex" : 0,
"partitionKeys" : [ 0 ],
- "orderKeys" : [ ],
"type" : "ROW<`name` VARCHAR(2147483647), `score` INT> NOT NULL"
}, {
"kind" : "CALL",
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-partitioned-named-timers-restore/plan/process-partitioned-named-timers-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-partitioned-named-timers-restore/plan/process-partitioned-named-timers-restore.json
index 3d2db4fe4ee..ae8b2d16604 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-partitioned-named-timers-restore/plan/process-partitioned-named-timers-restore.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-partitioned-named-timers-restore/plan/process-partitioned-named-timers-restore.json
@@ -159,7 +159,6 @@
"kind" : "TABLE_ARG_CALL",
"inputIndex" : 0,
"partitionKeys" : [ 0 ],
- "orderKeys" : [ ],
"type" : {
"type" : "ROW",
"nullable" : false,
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-row-semantic-table-restore/plan/process-row-semantic-table-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-row-semantic-table-restore/plan/process-row-semantic-table-restore.json
index 781b49d4eff..757655dc968 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-row-semantic-table-restore/plan/process-row-semantic-table-restore.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-row-semantic-table-restore/plan/process-row-semantic-table-restore.json
@@ -40,8 +40,6 @@
"operands" : [ {
"kind" : "TABLE_ARG_CALL",
"inputIndex" : 0,
- "partitionKeys" : [ ],
- "orderKeys" : [ ],
"type" : "ROW<`name` VARCHAR(2147483647), `score` INT> NOT NULL"
}, {
"kind" : "LITERAL",
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-updating-output-upsert-restore/plan/process-updating-output-upsert-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-updating-output-upsert-restore/plan/process-updating-output-upsert-restore.json
index f0fd3007f04..b2743e6c069 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-updating-output-upsert-restore/plan/process-updating-output-upsert-restore.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-updating-output-upsert-restore/plan/process-updating-output-upsert-restore.json
@@ -59,7 +59,6 @@
"kind" : "TABLE_ARG_CALL",
"inputIndex" : 0,
"partitionKeys" : [ 0 ],
- "orderKeys" : [ ],
"type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `EXPR$1` BIGINT>
NOT NULL"
}, {
"kind" : "CALL",
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
index 324327cad4b..f64d0eb37b3 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
@@ -45,9 +45,6 @@
"operands" : [ {
"kind" : "TABLE_ARG_CALL",
"inputIndex" : 0,
- "partitionKeys" : [ ],
- "orderKeys" : [ ],
- "orderDirections" : [ ],
"type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT
NULL"
}, {
"kind" : "CALL",