This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f8d2bdb19eb [FLINK-26756][table-planner] Fix the deserialization error 
for match recognize
f8d2bdb19eb is described below

commit f8d2bdb19eb954ef384a78b5e21991a4327c23db
Author: godfreyhe <[email protected]>
AuthorDate: Mon Mar 21 15:40:26 2022 +0800

    [FLINK-26756][table-planner] Fix the deserialization error for match 
recognize
    
    This closes #19179
---
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  | 46 ++++++++++++++++++++++
 .../nodes/exec/serde/RexNodeJsonSerializer.java    | 14 +++++--
 .../table/planner/utils/JsonPlanTestBase.java      |  6 ++-
 .../testMatch.out                                  |  4 +-
 4 files changed, 63 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index 8e52495ea3f..cbb7df05680 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -46,8 +46,10 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlNameMatchers;
@@ -91,6 +93,7 @@ import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSe
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_OPERANDS;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_RANGES;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SARG;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SQL_KIND;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SYMBOL;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SYNTAX;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SYSTEM_NAME;
@@ -340,6 +343,9 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
         } else if (jsonNode.has(FIELD_NAME_SYSTEM_NAME)) {
             return deserializeSystemFunction(
                     jsonNode.required(FIELD_NAME_SYSTEM_NAME).asText(), 
syntax, serdeContext);
+        } else if (jsonNode.has(FIELD_NAME_SQL_KIND)) {
+            return deserializeInternalFunction(
+                    syntax, 
SqlKind.valueOf(jsonNode.get(FIELD_NAME_SQL_KIND).asText()));
         } else {
             throw new TableException("Invalid function call.");
         }
@@ -375,6 +381,13 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
         if (latestOperator.isPresent()) {
             return latestOperator.get();
         }
+
+        Optional<SqlOperator> sqlStdOperator =
+                lookupOptionalSqlStdOperator(publicName, syntax, null);
+        if (sqlStdOperator.isPresent()) {
+            return sqlStdOperator.get();
+        }
+
         throw new TableException(
                 String.format(
                         "Could not resolve internal system function '%s'. "
@@ -382,6 +395,19 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
                         internalName));
     }
 
+    private static SqlOperator deserializeInternalFunction(SqlSyntax syntax, 
SqlKind sqlKind) {
+        final Optional<SqlOperator> stdOperator = 
lookupOptionalSqlStdOperator("", syntax, sqlKind);
+        if (stdOperator.isPresent()) {
+            return stdOperator.get();
+        }
+
+        throw new TableException(
+                String.format(
+                        "Could not resolve internal system function '%s'. "
+                                + "This is a bug, please file an issue.",
+                        sqlKind.name()));
+    }
+
     private static SqlOperator deserializeFunctionClass(
             JsonNode jsonNode, SerdeContext serdeContext) {
         final String className = jsonNode.required(FIELD_NAME_CLASS).asText();
@@ -506,6 +532,26 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
         }
     }
 
+    private static Optional<SqlOperator> lookupOptionalSqlStdOperator(
+            String operatorName, SqlSyntax syntax, @Nullable SqlKind sqlKind) {
+        List<SqlOperator> foundOperators = new ArrayList<>();
+        // try to find operator from std operator table.
+        SqlStdOperatorTable.instance()
+                .lookupOperatorOverloads(
+                        new SqlIdentifier(operatorName, new SqlParserPos(0, 
0)),
+                        null, // category
+                        syntax,
+                        foundOperators,
+                        SqlNameMatchers.liberal());
+        if (foundOperators.size() == 1) {
+            return Optional.of(foundOperators.get(0));
+        }
+        // in case different operator has the same kind, check with both name 
and kind.
+        return foundOperators.stream()
+                .filter(o -> sqlKind != null && o.getKind() == sqlKind)
+                .findFirst();
+    }
+
     private static TableException missingSystemFunction(String systemName) {
         return new TableException(
                 String.format(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index 7e38317e729..5e788723cd3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -116,6 +116,7 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
     static final String FIELD_NAME_SYSTEM_NAME = "systemName";
     static final String FIELD_NAME_CATALOG_NAME = "catalogName";
     static final String FIELD_NAME_SYNTAX = "syntax";
+    static final String FIELD_NAME_SQL_KIND = "sqlKind";
     static final String FIELD_NAME_CLASS = "class";
 
     RexNodeJsonSerializer() {
@@ -393,10 +394,15 @@ final class RexNodeJsonSerializer extends 
StdSerializer<RexNode> {
                 || operator instanceof AggSqlFunction) {
             throw legacyException(operator.toString());
         } else {
-            // We assume that all regular SqlOperators are internal. Only the 
function definitions
-            // stack is exposed to the user and can thus be external.
-            gen.writeStringField(
-                    FIELD_NAME_INTERNAL_NAME, 
BuiltInSqlOperator.toQualifiedName(operator));
+            if (operator.getName().isEmpty()) {
+                gen.writeStringField(FIELD_NAME_SQL_KIND, 
operator.getKind().name());
+            } else {
+                // We assume that all regular SqlOperators are internal. Only 
the function
+                // definitions
+                // stack is exposed to the user and can thus be external.
+                gen.writeStringField(
+                        FIELD_NAME_INTERNAL_NAME, 
BuiltInSqlOperator.toQualifiedName(operator));
+            }
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
index 5a34ae5eff6..c3bf34a7a06 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlanReference;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.internal.CompiledPlanUtils;
@@ -72,7 +73,10 @@ public abstract class JsonPlanTestBase extends 
AbstractTestBase {
     protected TableResult compileSqlAndExecutePlan(String sql) {
         CompiledPlan compiledPlan = tableEnv.compilePlanSql(sql);
         checkTransformationUids(compiledPlan);
-        return compiledPlan.execute();
+        // try to execute the string json plan to validate to ser/de result
+        String jsonPlan = compiledPlan.asJsonString();
+        CompiledPlan newCompiledPlan = 
tableEnv.loadPlan(PlanReference.fromJsonString(jsonPlan));
+        return newCompiledPlan.execute();
     }
 
     protected void checkTransformationUids(CompiledPlan compiledPlan) {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out
index c7bbd666aba..62fc80c1cbc 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out
@@ -130,11 +130,11 @@
       "pattern" : {
         "kind" : "CALL",
         "syntax" : "BINARY",
-        "internalName" : "$$1",
+        "sqlKind" : "PATTERN_CONCAT",
         "operands" : [ {
           "kind" : "CALL",
           "syntax" : "BINARY",
-          "internalName" : "$$1",
+          "sqlKind" : "PATTERN_CONCAT",
           "operands" : [ {
             "kind" : "LITERAL",
             "value" : "A\"",

Reply via email to