This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4afd0924582 [FLINK-33158] Cryptic exception when there is a 
StreamExecSort in JsonPlan
4afd0924582 is described below

commit 4afd09245823a1cf2d849dbd84c1b6d5ab58c875
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Sep 26 12:38:07 2023 +0200

    [FLINK-33158] Cryptic exception when there is a StreamExecSort in JsonPlan
---
 .../plan/nodes/exec/stream/StreamExecSort.java     |  31 +++-
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |   2 +-
 .../plan/nodes/exec/stream/SortJsonPlanTest.java   |  66 ++++++++
 .../stream/SortJsonPlanTest_jsonplan/testSort.out  | 172 +++++++++++++++++++++
 4 files changed, 269 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
index 81458f0cae4..fb9f38ed967 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableException;
@@ -28,6 +29,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -37,7 +39,11 @@ import 
org.apache.flink.table.runtime.operators.sort.StreamSortOperator;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.Collections;
+import java.util.List;
 
 /**
  * {@link StreamExecNode} for Sort.
@@ -45,10 +51,19 @@ import java.util.Collections;
  * <p><b>NOTES:</b> This class is used for testing with bounded source now. If 
a query is converted
  * to this node in product environment, an exception will be thrown.
  */
+@ExecNodeMetadata(
+        name = "stream-exec-sort",
+        version = 1,
+        producedTransformations = {StreamExecSort.SORT_TRANSFORMATION},
+        minPlanVersion = FlinkVersion.v1_18,
+        minStateVersion = FlinkVersion.v1_18)
 public class StreamExecSort extends ExecNodeBase<RowData> implements 
StreamExecNode<RowData> {
 
-    private static final String SORT_TRANSFORMATION = "sort";
+    public static final String SORT_TRANSFORMATION = "sort";
+
+    public static final String FIELD_NAME_SORT_SPEC = "orderBy";
 
+    @JsonProperty(FIELD_NAME_SORT_SPEC)
     private final SortSpec sortSpec;
 
     public StreamExecSort(
@@ -67,6 +82,20 @@ public class StreamExecSort extends ExecNodeBase<RowData> 
implements StreamExecN
         this.sortSpec = sortSpec;
     }
 
+    @JsonCreator
+    public StreamExecSort(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        this.sortSpec = sortSpec;
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     protected Transformation<RowData> translateToPlanInternal(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 174c169d41a..5c467f29583 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -126,6 +126,7 @@ public final class ExecNodeMetadataUtil {
                     add(StreamExecRank.class);
                     add(StreamExecSink.class);
                     add(StreamExecSortLimit.class);
+                    add(StreamExecSort.class);
                     add(StreamExecTableSourceScan.class);
                     add(StreamExecTemporalJoin.class);
                     add(StreamExecTemporalSort.class);
@@ -163,7 +164,6 @@ public final class ExecNodeMetadataUtil {
                     add(StreamExecLegacySink.class);
                     add(StreamExecGroupTableAggregate.class);
                     add(StreamExecPythonGroupTableAggregate.class);
-                    add(StreamExecSort.class);
                     add(StreamExecMultipleInput.class);
                 }
             };
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest.java
new file mode 100644
index 00000000000..a23264cc2b7
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization for sort limit. */
+public class SortJsonPlanTest extends TableTestBase {
+
+    private StreamTableTestUtil util;
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        util = streamTestUtil(TableConfig.getDefault());
+        tEnv = util.getTableEnv();
+
+        String srcTableDdl =
+                "CREATE TABLE MyTable (\n"
+                        + "  a bigint,\n"
+                        + "  b int not null,\n"
+                        + "  c varchar,\n"
+                        + "  d timestamp(3)\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'bounded' = 'false')";
+        tEnv.executeSql(srcTableDdl);
+    }
+
+    @Test
+    public void testSort() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  b bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        String sql = "insert into MySink SELECT a, a from MyTable order by b";
+        util.verifyJsonPlan(sql);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest_jsonplan/testSort.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest_jsonplan/testSort.out
new file mode 100644
index 00000000000..a0d7d070ec1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest_jsonplan/testSort.out
@@ -0,0 +1,172 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ] ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sort_1",
+    "orderBy" : {
+      "fields" : [ {
+        "index" : 1,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "Sort(orderBy=[b ASC])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `a1` BIGINT>",
+    "description" : "Calc(select=[a, a AS a1])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "sink-insert-only" : "false",
+            "table-sink-class" : "DEFAULT",
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `a1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, a1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file

Reply via email to