This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f8d2bdb19eb [FLINK-26756][table-planner] Fix the deserialization error
for match recognize
f8d2bdb19eb is described below
commit f8d2bdb19eb954ef384a78b5e21991a4327c23db
Author: godfreyhe <[email protected]>
AuthorDate: Mon Mar 21 15:40:26 2022 +0800
[FLINK-26756][table-planner] Fix the deserialization error for match
recognize
This closes #19179
---
.../nodes/exec/serde/RexNodeJsonDeserializer.java | 46 ++++++++++++++++++++++
.../nodes/exec/serde/RexNodeJsonSerializer.java | 14 +++++--
.../table/planner/utils/JsonPlanTestBase.java | 6 ++-
.../testMatch.out | 4 +-
4 files changed, 63 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index 8e52495ea3f..cbb7df05680 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -46,8 +46,10 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlNameMatchers;
@@ -91,6 +93,7 @@ import static
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSe
import static
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_OPERANDS;
import static
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_RANGES;
import static
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SARG;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SQL_KIND;
import static
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SYMBOL;
import static
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SYNTAX;
import static
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SYSTEM_NAME;
@@ -340,6 +343,9 @@ final class RexNodeJsonDeserializer extends
StdDeserializer<RexNode> {
} else if (jsonNode.has(FIELD_NAME_SYSTEM_NAME)) {
return deserializeSystemFunction(
jsonNode.required(FIELD_NAME_SYSTEM_NAME).asText(),
syntax, serdeContext);
+ } else if (jsonNode.has(FIELD_NAME_SQL_KIND)) {
+ return deserializeInternalFunction(
+ syntax,
SqlKind.valueOf(jsonNode.get(FIELD_NAME_SQL_KIND).asText()));
} else {
throw new TableException("Invalid function call.");
}
@@ -375,6 +381,13 @@ final class RexNodeJsonDeserializer extends
StdDeserializer<RexNode> {
if (latestOperator.isPresent()) {
return latestOperator.get();
}
+
+ Optional<SqlOperator> sqlStdOperator =
+ lookupOptionalSqlStdOperator(publicName, syntax, null);
+ if (sqlStdOperator.isPresent()) {
+ return sqlStdOperator.get();
+ }
+
throw new TableException(
String.format(
"Could not resolve internal system function '%s'. "
@@ -382,6 +395,19 @@ final class RexNodeJsonDeserializer extends
StdDeserializer<RexNode> {
internalName));
}
+ private static SqlOperator deserializeInternalFunction(SqlSyntax syntax,
SqlKind sqlKind) {
+ final Optional<SqlOperator> stdOperator =
lookupOptionalSqlStdOperator("", syntax, sqlKind);
+ if (stdOperator.isPresent()) {
+ return stdOperator.get();
+ }
+
+ throw new TableException(
+ String.format(
+ "Could not resolve internal system function '%s'. "
+ + "This is a bug, please file an issue.",
+ sqlKind.name()));
+ }
+
private static SqlOperator deserializeFunctionClass(
JsonNode jsonNode, SerdeContext serdeContext) {
final String className = jsonNode.required(FIELD_NAME_CLASS).asText();
@@ -506,6 +532,26 @@ final class RexNodeJsonDeserializer extends
StdDeserializer<RexNode> {
}
}
+ private static Optional<SqlOperator> lookupOptionalSqlStdOperator(
+ String operatorName, SqlSyntax syntax, @Nullable SqlKind sqlKind) {
+ List<SqlOperator> foundOperators = new ArrayList<>();
+ // try to find operator from std operator table.
+ SqlStdOperatorTable.instance()
+ .lookupOperatorOverloads(
+ new SqlIdentifier(operatorName, new SqlParserPos(0,
0)),
+ null, // category
+ syntax,
+ foundOperators,
+ SqlNameMatchers.liberal());
+ if (foundOperators.size() == 1) {
+ return Optional.of(foundOperators.get(0));
+ }
+ // in case different operator has the same kind, check with both name
and kind.
+ return foundOperators.stream()
+ .filter(o -> sqlKind != null && o.getKind() == sqlKind)
+ .findFirst();
+ }
+
private static TableException missingSystemFunction(String systemName) {
return new TableException(
String.format(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index 7e38317e729..5e788723cd3 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -116,6 +116,7 @@ final class RexNodeJsonSerializer extends
StdSerializer<RexNode> {
static final String FIELD_NAME_SYSTEM_NAME = "systemName";
static final String FIELD_NAME_CATALOG_NAME = "catalogName";
static final String FIELD_NAME_SYNTAX = "syntax";
+ static final String FIELD_NAME_SQL_KIND = "sqlKind";
static final String FIELD_NAME_CLASS = "class";
RexNodeJsonSerializer() {
@@ -393,10 +394,15 @@ final class RexNodeJsonSerializer extends
StdSerializer<RexNode> {
|| operator instanceof AggSqlFunction) {
throw legacyException(operator.toString());
} else {
- // We assume that all regular SqlOperators are internal. Only the
function definitions
- // stack is exposed to the user and can thus be external.
- gen.writeStringField(
- FIELD_NAME_INTERNAL_NAME,
BuiltInSqlOperator.toQualifiedName(operator));
+ if (operator.getName().isEmpty()) {
+ gen.writeStringField(FIELD_NAME_SQL_KIND,
operator.getKind().name());
+ } else {
+ // We assume that all regular SqlOperators are internal. Only
the function
+ // definitions
+ // stack is exposed to the user and can thus be external.
+ gen.writeStringField(
+ FIELD_NAME_INTERNAL_NAME,
BuiltInSqlOperator.toQualifiedName(operator));
+ }
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
index 5a34ae5eff6..c3bf34a7a06 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.CompiledPlanUtils;
@@ -72,7 +73,10 @@ public abstract class JsonPlanTestBase extends
AbstractTestBase {
protected TableResult compileSqlAndExecutePlan(String sql) {
CompiledPlan compiledPlan = tableEnv.compilePlanSql(sql);
checkTransformationUids(compiledPlan);
- return compiledPlan.execute();
+ // try to execute the string json plan to validate to ser/de result
+ String jsonPlan = compiledPlan.asJsonString();
+ CompiledPlan newCompiledPlan =
tableEnv.loadPlan(PlanReference.fromJsonString(jsonPlan));
+ return newCompiledPlan.execute();
}
protected void checkTransformationUids(CompiledPlan compiledPlan) {
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out
index c7bbd666aba..62fc80c1cbc 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out
@@ -130,11 +130,11 @@
"pattern" : {
"kind" : "CALL",
"syntax" : "BINARY",
- "internalName" : "$$1",
+ "sqlKind" : "PATTERN_CONCAT",
"operands" : [ {
"kind" : "CALL",
"syntax" : "BINARY",
- "internalName" : "$$1",
+ "sqlKind" : "PATTERN_CONCAT",
"operands" : [ {
"kind" : "LITERAL",
"value" : "A\"",