slinkydeveloper commented on a change in pull request #18858:
URL: https://github.com/apache/flink/pull/18858#discussion_r811879731
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
##########
@@ -42,7 +44,10 @@ import scala.collection.JavaConverters._
* @param displayName name to be displayed in operator name
* @param scalarFunction scalar function to be called
* @param typeFactory type factory for converting Flink's between
Calcite's types
+ * @deprecated Use [[BuiltInFunctionDefinitions]] that translates to
[[BridgingSqlFunction]].
*/
+@Deprecated
+@deprecated
Review comment:
2 deprecated annotations?
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
##########
@@ -56,44 +42,42 @@
import org.apache.calcite.util.Sarg;
import org.apache.calcite.util.TimeString;
import org.apache.calcite.util.TimestampString;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.Random;
+import java.util.stream.Stream;
-import static org.junit.Assert.assertEquals;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.configuredSerdeContext;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toJson;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toObject;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for serialization/deserialization of {@link RexNode}. */
-@RunWith(Parameterized.class)
public class RexNodeSerdeTest {
Review comment:
Have you tried to use `@Execution(CONCURRENT)`?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
##########
@@ -177,8 +177,9 @@ public String toString() {
// Shared with BuiltInSqlOperator and BuiltInSqlFunction in planner
//
--------------------------------------------------------------------------------------------
+ // note that for SQL operators the name can contain spaces and dollar signs
private static final Pattern INTERNAL_NAME_PATTERN =
- Pattern.compile("\\$[A-Z0-9_]+\\$[1-9][0-9]*");
+ Pattern.compile("\\$[A-Z0-9_ $]+\\$[1-9][0-9]*");
Review comment:
rather than space use `\s`
##########
File path:
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
##########
@@ -33,7 +35,15 @@
public static final String DEFAULT_DATABASE =
EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
public static CatalogManager createEmptyCatalogManager() {
- final CatalogManager catalogManager = preparedCatalogManager().build();
+ return createEmptyCatalogManager(null);
+ }
+
+ public static CatalogManager createEmptyCatalogManager(@Nullable Catalog
catalog) {
Review comment:
If it has a catalog, then it's not empty anymore? Maybe rename with
`createCatalogManager`?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/BuiltInSqlOperator.java
##########
@@ -74,4 +74,16 @@ static String toQualifiedName(SqlOperator operator) {
}
return qualifyFunctionName(operator.getName(), DEFAULT_VERSION);
}
+
+ static String extractNameFromQualifiedName(String qualifiedName) {
+ // supports all various kinds of qualified names
+ // $FUNC$1 => FUNC
+ // $IS NULL$1 => IS NULL
+ // $$CALCITE_INTERNAL$1 => $CALCITE_INTERNAL
+ int versionPos = qualifiedName.length() - 1;
+ while (Character.isDigit(qualifiedName.charAt(versionPos))) {
+ versionPos--;
+ }
+ return qualifiedName.substring(1, versionPos);
+ }
Review comment:
I'm not much of a fan of this parsing logic, as it breaks with all
various string corner cases, e.g. UTF-16 strings (think about a name in chinese
of a function name). Just looking at this commit, I don't understand where this
function is used. Is it used only for built-ins? Or some UDFs can go through
this? If that's the case, I suggest to replace this logic with code points
handling, otherwise ignore.
In any case, I think it's nicer to read if you match instead on the `'$'`
character for looping, rather than checking the digit
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -220,185 +260,298 @@ private Object toLiteralValue(
return ByteString.ofBase64(valueNode.asText());
case CHAR:
case VARCHAR:
- return SerdeContext.get(ctx)
- .getRexBuilder()
- .makeLiteral(valueNode.asText())
- .getValue();
+ return
serdeContext.getRexBuilder().makeLiteral(valueNode.asText()).getValue();
case SYMBOL:
- JsonNode classNode = literalNode.get(FIELD_NAME_CLASS);
- return serializableToCalcite(
- SerializableSymbol.of(classNode.asText(),
valueNode.asText()));
- case ROW:
- case MULTISET:
- ArrayNode valuesNode = (ArrayNode) valueNode;
- List<RexNode> list = new ArrayList<>();
- for (int i = 0; i < valuesNode.size(); ++i) {
- list.add(deserialize(valuesNode.get(i), codec, ctx));
- }
- return list;
+ final JsonNode symbolNode =
literalNode.required(FIELD_NAME_SYMBOL);
+ final SerializableSymbol symbol =
+ SerializableSymbol.of(symbolNode.asText(),
valueNode.asText());
+ return serializableToCalcite(symbol);
default:
throw new TableException("Unknown literal: " + valueNode);
}
}
- @SuppressWarnings({"rawtypes", "unchecked", "UnstableApiUsage"})
- private Sarg<?> toSarg(
- JsonNode jsonNode,
- SqlTypeName sqlTypeName,
- ObjectCodec codec,
- DeserializationContext ctx)
+ private static RexNode deserializeFieldAccess(JsonNode jsonNode,
SerdeContext serdeContext)
throws IOException {
- ArrayNode rangesNode = (ArrayNode) jsonNode.get(FIELD_NAME_RANGES);
- com.google.common.collect.ImmutableRangeSet.Builder builder =
- com.google.common.collect.ImmutableRangeSet.builder();
- for (JsonNode rangeNode : rangesNode) {
- com.google.common.collect.Range<?> range =
com.google.common.collect.Range.all();
- if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
- JsonNode lowerNode = rangeNode.get(FIELD_NAME_BOUND_LOWER);
- Comparable<?> boundValue =
- checkNotNull(
- (Comparable<?>) toLiteralValue(lowerNode,
sqlTypeName, codec, ctx));
- com.google.common.collect.BoundType boundType =
- com.google.common.collect.BoundType.valueOf(
-
lowerNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
- com.google.common.collect.Range r =
- boundType == com.google.common.collect.BoundType.OPEN
- ?
com.google.common.collect.Range.greaterThan(boundValue)
- :
com.google.common.collect.Range.atLeast(boundValue);
- range = range.intersection(r);
- }
- if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
- JsonNode upperNode = rangeNode.get(FIELD_NAME_BOUND_UPPER);
- Comparable<?> boundValue =
- checkNotNull(
- (Comparable<?>) toLiteralValue(upperNode,
sqlTypeName, codec, ctx));
- com.google.common.collect.BoundType boundType =
- com.google.common.collect.BoundType.valueOf(
-
upperNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
- com.google.common.collect.Range r =
- boundType == com.google.common.collect.BoundType.OPEN
- ?
com.google.common.collect.Range.lessThan(boundValue)
- :
com.google.common.collect.Range.atMost(boundValue);
- range = range.intersection(r);
- }
- if (range.hasUpperBound() || range.hasLowerBound()) {
- builder.add(range);
- }
- }
- boolean containsNull =
jsonNode.get(FIELD_NAME_CONTAINS_NULL).booleanValue();
- return Sarg.of(containsNull, builder.build());
+ final String fieldName = jsonNode.required(FIELD_NAME_NAME).asText();
+ final JsonNode exprNode = jsonNode.required(FIELD_NAME_EXPR);
+ final RexNode refExpr = deserialize(exprNode, serdeContext);
+ return serdeContext.getRexBuilder().makeFieldAccess(refExpr,
fieldName, true);
}
- private RexNode deserializeFieldAccess(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- String fieldName = jsonNode.get(FIELD_NAME_NAME).asText();
- JsonNode exprNode = jsonNode.get(FIELD_NAME_EXPR);
- RexNode refExpr = deserialize(exprNode, codec, ctx);
- return SerdeContext.get(ctx).getRexBuilder().makeFieldAccess(refExpr,
fieldName, true);
+ private static RexNode deserializeCorrelVariable(JsonNode jsonNode,
SerdeContext serdeContext) {
+ final String correl = jsonNode.required(FIELD_NAME_CORREL).asText();
+ final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+ final RelDataType fieldType =
+ RelDataTypeJsonDeserializer.deserialize(logicalTypeNode,
serdeContext);
+ return serdeContext.getRexBuilder().makeCorrel(fieldType, new
CorrelationId(correl));
}
- private RexNode deserializeCorrelVariable(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- String correl = jsonNode.get(FIELD_NAME_CORREL).asText();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType fieldType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
- return SerdeContext.get(ctx)
- .getRexBuilder()
- .makeCorrel(fieldType, new CorrelationId(correl));
+ private static RexNode deserializePatternFieldRef(
+ JsonNode jsonNode, SerdeContext serdeContext) {
+ final int inputIndex =
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+ final String alpha = jsonNode.required(FIELD_NAME_ALPHA).asText();
+ final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+ final RelDataType fieldType =
+ RelDataTypeJsonDeserializer.deserialize(logicalTypeNode,
serdeContext);
+ return serdeContext.getRexBuilder().makePatternFieldRef(alpha,
fieldType, inputIndex);
}
- private RexNode deserializeCall(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
- SqlOperator operator = toOperator(jsonNode.get(FIELD_NAME_OPERATOR),
SerdeContext.get(ctx));
- ArrayNode operandNodes = (ArrayNode) jsonNode.get(FIELD_NAME_OPERANDS);
- List<RexNode> rexOperands = new ArrayList<>();
+ private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext
serdeContext)
+ throws IOException {
+ final SqlOperator operator = deserializeSqlOperator(jsonNode,
serdeContext);
+ final ArrayNode operandNodes = (ArrayNode)
jsonNode.get(FIELD_NAME_OPERANDS);
+ final List<RexNode> rexOperands = new ArrayList<>();
for (JsonNode node : operandNodes) {
- rexOperands.add(deserialize(node, codec, ctx));
+ rexOperands.add(deserialize(node, serdeContext));
}
final RelDataType callType;
if (jsonNode.has(FIELD_NAME_TYPE)) {
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- callType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
+ final JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
+ callType = RelDataTypeJsonDeserializer.deserialize(typeNode,
serdeContext);
} else {
- callType = rexBuilder.deriveReturnType(operator, rexOperands);
+ callType = serdeContext.getRexBuilder().deriveReturnType(operator,
rexOperands);
}
- return rexBuilder.makeCall(callType, operator, rexOperands);
+ return serdeContext.getRexBuilder().makeCall(callType, operator,
rexOperands);
}
- private SqlOperator toOperator(JsonNode jsonNode, SerdeContext ctx) throws
IOException {
- String name = jsonNode.get(FIELD_NAME_NAME).asText();
- SqlKind sqlKind =
SqlKind.valueOf(jsonNode.get(FIELD_NAME_KIND).asText());
- SqlSyntax sqlSyntax =
SqlSyntax.valueOf(jsonNode.get(FIELD_NAME_SYNTAX).asText());
- List<SqlOperator> operators = new ArrayList<>();
- ctx.getOperatorTable()
- .lookupOperatorOverloads(
- new SqlIdentifier(name, new SqlParserPos(0, 0)),
- null, // category
- sqlSyntax,
- operators,
- SqlNameMatchers.liberal());
- for (SqlOperator operator : operators) {
- // in case different operator has the same kind, check with both
name and kind.
- if (operator.kind == sqlKind) {
- return operator;
- }
+ private static SqlOperator deserializeSqlOperator(
+ JsonNode jsonNode, SerdeContext serdeContext) {
+ final SqlSyntax syntax;
+ if (jsonNode.has(FIELD_NAME_SYNTAX)) {
+ syntax =
+ serializableToCalcite(
+ SqlSyntax.class,
jsonNode.required(FIELD_NAME_SYNTAX).asText());
+ } else {
+ syntax = SqlSyntax.FUNCTION;
}
- // try to find operator from std operator table.
- SqlStdOperatorTable.instance()
- .lookupOperatorOverloads(
- new SqlIdentifier(name, new SqlParserPos(0, 0)),
- null, // category
- sqlSyntax,
- operators,
- SqlNameMatchers.liberal());
- for (SqlOperator operator : operators) {
- // in case different operator has the same kind, check with both
name and kind.
- if (operator.kind == sqlKind) {
- return operator;
- }
+ if (jsonNode.has(FIELD_NAME_INTERNAL_NAME)) {
+ return deserializeInternalFunction(
+ jsonNode.required(FIELD_NAME_INTERNAL_NAME).asText(),
syntax, serdeContext);
+ } else if (jsonNode.has(FIELD_NAME_CATALOG_NAME)) {
+ return deserializeCatalogFunction(jsonNode, syntax, serdeContext);
+ } else if (jsonNode.has(FIELD_NAME_CLASS)) {
+ return deserializeFunctionClass(jsonNode, serdeContext);
+ } else if (jsonNode.has(FIELD_NAME_SYSTEM_NAME)) {
+ return deserializeSystemFunction(
+ jsonNode.required(FIELD_NAME_SYSTEM_NAME).asText(),
syntax, serdeContext);
+ } else {
+ throw new TableException("Invalid function call.");
}
+ }
- // built-in function
- // TODO supports other module's built-in function
- if (jsonNode.has(FIELD_NAME_BUILT_IN) &&
jsonNode.get(FIELD_NAME_BUILT_IN).booleanValue()) {
- Optional<FunctionDefinition> function =
CoreModule.INSTANCE.getFunctionDefinition(name);
- Preconditions.checkArgument(function.isPresent());
- return BridgingSqlFunction.of(
- ctx.getFlinkContext(),
- ctx.getTypeFactory(),
-
ContextResolvedFunction.permanent(FunctionIdentifier.of(name), function.get()));
+ private static SqlOperator deserializeSystemFunction(
+ String systemName, SqlSyntax syntax, SerdeContext serdeContext) {
+ // This method covers both temporary system functions and permanent
system
+ // functions from a module
+ final Optional<SqlOperator> systemOperator =
+ lookupOptionalSqlOperator(
+ FunctionIdentifier.of(systemName), syntax,
serdeContext, true);
+ if (systemOperator.isPresent()) {
+ return systemOperator.get();
}
+ throw new TableException(
Review comment:
Extract important exceptions in separate methods
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
##########
@@ -92,121 +519,129 @@ public void testRexNodeSerde(RexNode rexNode) throws
IOException {
Arrays.asList("f1", "f2", "f3"));
return Stream.of(
-
rexBuilder.makeNullLiteral(FACTORY.createSqlType(SqlTypeName.VARCHAR)),
- rexBuilder.makeLiteral(true),
- rexBuilder.makeExactLiteral(
- new BigDecimal(Byte.MAX_VALUE),
FACTORY.createSqlType(SqlTypeName.TINYINT)),
- rexBuilder.makeExactLiteral(
- new BigDecimal(Short.MAX_VALUE),
- FACTORY.createSqlType(SqlTypeName.SMALLINT)),
- rexBuilder.makeExactLiteral(
- new BigDecimal(Integer.MAX_VALUE),
- FACTORY.createSqlType(SqlTypeName.INTEGER)),
- rexBuilder.makeExactLiteral(
- new BigDecimal(Long.MAX_VALUE),
FACTORY.createSqlType(SqlTypeName.BIGINT)),
- rexBuilder.makeExactLiteral(
- BigDecimal.valueOf(Double.MAX_VALUE),
- FACTORY.createSqlType(SqlTypeName.DOUBLE)),
- rexBuilder.makeApproxLiteral(
- BigDecimal.valueOf(Float.MAX_VALUE),
- FACTORY.createSqlType(SqlTypeName.FLOAT)),
- rexBuilder.makeExactLiteral(new
BigDecimal("23.1234567890123456789012345678")),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(100),
- new SqlIntervalQualifier(
- TimeUnit.YEAR,
- 4,
- TimeUnit.YEAR,
- RelDataType.PRECISION_NOT_SPECIFIED,
- SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.YEAR,
- 2,
- TimeUnit.MONTH,
- RelDataType.PRECISION_NOT_SPECIFIED,
- SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.DAY, 2, TimeUnit.SECOND, 6,
SqlParserPos.ZERO)),
- rexBuilder.makeIntervalLiteral(
- BigDecimal.valueOf(3),
- new SqlIntervalQualifier(
- TimeUnit.SECOND, 2, TimeUnit.SECOND, 6,
SqlParserPos.ZERO)),
- rexBuilder.makeDateLiteral(DateString.fromDaysSinceEpoch(10)),
- rexBuilder.makeDateLiteral(new DateString("2000-12-12")),
- rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(1234),
3),
- rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(123456),
6),
- rexBuilder.makeTimeLiteral(new
TimeString("01:01:01.000000001"), 9),
-
rexBuilder.makeTimestampLiteral(TimestampString.fromMillisSinceEpoch(1234), 3),
-
rexBuilder.makeTimestampLiteral(TimestampString.fromMillisSinceEpoch(123456789),
9),
- rexBuilder.makeTimestampLiteral(
- new TimestampString("0001-01-01 01:01:01.000000001"),
9),
- rexBuilder.makeTimestampLiteral(new
TimestampString("2000-12-12 12:30:57.1234"), 4),
- rexBuilder.makeBinaryLiteral(ByteString.EMPTY),
-
rexBuilder.makeBinaryLiteral(ByteString.ofBase64("SGVsbG8gV29ybGQh")),
- rexBuilder.makeLiteral(""),
- rexBuilder.makeLiteral("abc"),
- rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH),
- rexBuilder.makeFlag(TimeUnitRange.DAY),
- rexBuilder.makeLiteral(
- Arrays.<Object>asList(1, 2L),
- FACTORY.createStructType(
- StructKind.PEEK_FIELDS_NO_EXPAND,
- Arrays.asList(
-
FACTORY.createSqlType(SqlTypeName.INTEGER),
-
FACTORY.createSqlType(SqlTypeName.BIGINT)),
- Arrays.asList("f1", "f2")),
- false),
- rexBuilder.makeSearchArgumentLiteral(
- Sarg.of(
- false,
- ImmutableRangeSet.of(
- Range.closed(
- BigDecimal.valueOf(1),
BigDecimal.valueOf(10)))),
- FACTORY.createSqlType(SqlTypeName.INTEGER)),
- rexBuilder.makeSearchArgumentLiteral(
- Sarg.of(
- false,
- ImmutableRangeSet.of(
- Range.range(
- BigDecimal.valueOf(1),
- BoundType.OPEN,
- BigDecimal.valueOf(10),
- BoundType.CLOSED))),
- FACTORY.createSqlType(SqlTypeName.INTEGER)),
- rexBuilder.makeSearchArgumentLiteral(
- Sarg.of(
- false,
- TreeRangeSet.create(
- Arrays.asList(
- Range.closed(
- BigDecimal.valueOf(1),
- BigDecimal.valueOf(1)),
- Range.closed(
- BigDecimal.valueOf(3),
- BigDecimal.valueOf(3)),
- Range.closed(
- BigDecimal.valueOf(6),
-
BigDecimal.valueOf(6))))),
- FACTORY.createSqlType(SqlTypeName.INTEGER)),
-
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.BIGINT), 0),
- rexBuilder.makeCorrel(inputType, new CorrelationId("$cor1")),
- rexBuilder.makeFieldAccess(
- rexBuilder.makeCorrel(inputType, new
CorrelationId("$cor2")), "f2", true),
- // cast($1 as smallint)
- rexBuilder.makeCast(
- FACTORY.createSqlType(SqlTypeName.SMALLINT),
-
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1)),
- // $1 in (1, 3, 5)
- rexBuilder.makeIn(
-
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1),
- Arrays.asList(
- rexBuilder.makeExactLiteral(new BigDecimal(1)),
- rexBuilder.makeExactLiteral(new BigDecimal(3)),
- rexBuilder.makeExactLiteral(new
BigDecimal(5)))),
+ //
Review comment:
Why these comments?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
##########
@@ -185,11 +186,12 @@ static ValidationException missingIdentifier() {
static ValidationException lookupDisabled(ObjectIdentifier
objectIdentifier) {
return new ValidationException(
String.format(
- "The table '%s' does not contain any '%s' field, "
- + "but lookup is disabled because option '%s'
== '%s'. "
- + "Either enable the catalog lookup with '%s'
== '%s' | '%s' or regenerate the plan with '%s' != '%s'.",
+ "The persisted plan does not include all required
catalog metadata for table '%s'. "
Review comment:
BTW I'm already changing these here:
https://github.com/apache/flink/pull/18770
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -106,78 +118,108 @@ public RexNodeJsonDeserializer() {
@Override
public RexNode deserialize(JsonParser jsonParser, DeserializationContext
ctx)
throws IOException {
- JsonNode jsonNode = jsonParser.readValueAsTree();
- return deserialize(jsonNode, jsonParser.getCodec(), ctx);
+ final JsonNode jsonNode = jsonParser.readValueAsTree();
+ final SerdeContext serdeContext = SerdeContext.get(ctx);
+ return deserialize(jsonNode, serdeContext);
}
- private RexNode deserialize(JsonNode jsonNode, ObjectCodec codec,
DeserializationContext ctx)
+ private static RexNode deserialize(JsonNode jsonNode, SerdeContext
serdeContext)
throws IOException {
- String kind = jsonNode.get(FIELD_NAME_KIND).asText().toUpperCase();
+ final String kind = jsonNode.required(FIELD_NAME_KIND).asText();
switch (kind) {
- case SQL_KIND_INPUT_REF:
- return deserializeInputRef(jsonNode, codec, ctx);
- case SQL_KIND_LITERAL:
- return deserializeLiteral(jsonNode, codec, ctx);
- case SQL_KIND_FIELD_ACCESS:
- return deserializeFieldAccess(jsonNode, codec, ctx);
- case SQL_KIND_CORREL_VARIABLE:
- return deserializeCorrelVariable(jsonNode, codec, ctx);
- case SQL_KIND_REX_CALL:
- return deserializeCall(jsonNode, codec, ctx);
- case SQL_KIND_PATTERN_INPUT_REF:
- return deserializePatternInputRef(jsonNode, codec, ctx);
+ case KIND_INPUT_REF:
+ return deserializeInputRef(jsonNode, serdeContext);
+ case KIND_LITERAL:
+ return deserializeLiteral(jsonNode, serdeContext);
+ case KIND_FIELD_ACCESS:
+ return deserializeFieldAccess(jsonNode, serdeContext);
+ case KIND_CORREL_VARIABLE:
+ return deserializeCorrelVariable(jsonNode, serdeContext);
+ case KIND_PATTERN_INPUT_REF:
+ return deserializePatternFieldRef(jsonNode, serdeContext);
+ case KIND_CALL:
+ return deserializeCall(jsonNode, serdeContext);
default:
throw new TableException("Cannot convert to RexNode: " +
jsonNode.toPrettyString());
}
}
- private RexNode deserializeInputRef(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType fieldType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
- return SerdeContext.get(ctx).getRexBuilder().makeInputRef(fieldType,
inputIndex);
+ private static RexNode deserializeInputRef(JsonNode jsonNode, SerdeContext
serdeContext) {
+ final int inputIndex =
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+ final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+ final RelDataType fieldType =
+ RelDataTypeJsonDeserializer.deserialize(logicalTypeNode,
serdeContext);
+ return serdeContext.getRexBuilder().makeInputRef(fieldType,
inputIndex);
}
- private RexNode deserializePatternInputRef(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
- String alpha = jsonNode.get(FIELD_NAME_ALPHA).asText();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType fieldType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
- return SerdeContext.get(ctx)
- .getRexBuilder()
- .makePatternFieldRef(alpha, fieldType, inputIndex);
- }
-
- private RexNode deserializeLiteral(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType literalType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
+ private static RexNode deserializeLiteral(JsonNode jsonNode, SerdeContext
serdeContext) {
+ final JsonNode logicalTypeNode = jsonNode.get(FIELD_NAME_TYPE);
+ final RelDataType relDataType =
+ RelDataTypeJsonDeserializer.deserialize(logicalTypeNode,
serdeContext);
if (jsonNode.has(FIELD_NAME_SARG)) {
- Sarg<?> sarg =
- toSarg(jsonNode.get(FIELD_NAME_SARG),
literalType.getSqlTypeName(), codec, ctx);
- return rexBuilder.makeSearchArgumentLiteral(sarg, literalType);
+ return deserializeSarg(jsonNode.required(FIELD_NAME_SARG),
relDataType, serdeContext);
} else if (jsonNode.has(FIELD_NAME_VALUE)) {
- JsonNode literalNode = jsonNode.get(FIELD_NAME_VALUE);
- if (literalNode.isNull()) {
- return rexBuilder.makeNullLiteral(literalType);
+ final Object value =
+ deserializeLiteralValue(jsonNode,
relDataType.getSqlTypeName(), serdeContext);
+ if (value == null) {
+ return
serdeContext.getRexBuilder().makeNullLiteral(relDataType);
}
- Object literal = toLiteralValue(jsonNode,
literalType.getSqlTypeName(), codec, ctx);
- return rexBuilder.makeLiteral(literal, literalType, true);
+ return serdeContext.getRexBuilder().makeLiteral(value,
relDataType, true);
} else {
throw new TableException("Unknown literal: " +
jsonNode.toPrettyString());
}
}
- private Object toLiteralValue(
- JsonNode literalNode,
- SqlTypeName sqlTypeName,
- ObjectCodec codec,
- DeserializationContext ctx)
- throws IOException {
- JsonNode valueNode = literalNode.get(FIELD_NAME_VALUE);
+ @SuppressWarnings({"UnstableApiUsage", "rawtypes", "unchecked"})
+ private static RexNode deserializeSarg(
+ JsonNode sargNode, RelDataType relDataType, SerdeContext
serdeContext) {
+ final RexBuilder rexBuilder = serdeContext.getRexBuilder();
+ final ArrayNode rangesNode = (ArrayNode)
sargNode.required(FIELD_NAME_RANGES);
+ final Builder builder = builder();
+ for (JsonNode rangeNode : rangesNode) {
+ Range range = all();
+ if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
+ final JsonNode lowerNode =
rangeNode.required(FIELD_NAME_BOUND_LOWER);
+ final Comparable<?> boundValue =
+ (Comparable<?>)
+ deserializeLiteralValue(
+ lowerNode,
relDataType.getSqlTypeName(), serdeContext);
+ assert boundValue != null;
+ final BoundType boundType =
+ serializableToCalcite(
+ BoundType.class,
+
lowerNode.required(FIELD_NAME_BOUND_TYPE).asText());
+ final Range<?> r =
+ boundType == BoundType.OPEN ? greaterThan(boundValue)
: atLeast(boundValue);
+ range = range.intersection(r);
+ }
+ if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
+ final JsonNode upperNode =
rangeNode.required(FIELD_NAME_BOUND_UPPER);
+ final Comparable<?> boundValue =
+ (Comparable<?>)
+ deserializeLiteralValue(
+ upperNode,
relDataType.getSqlTypeName(), serdeContext);
+ assert boundValue != null;
+ final BoundType boundType =
+ serializableToCalcite(
+ BoundType.class,
+
upperNode.required(FIELD_NAME_BOUND_TYPE).asText());
+ final Range<?> r =
+ boundType == BoundType.OPEN ? lessThan(boundValue) :
atMost(boundValue);
+ range = range.intersection(r);
+ }
+ if (range.hasUpperBound() || range.hasLowerBound()) {
+ builder.add(range);
+ }
+ }
+ final boolean containsNull =
sargNode.required(FIELD_NAME_CONTAINS_NULL).booleanValue();
+ return rexBuilder.makeSearchArgumentLiteral(
+ Sarg.of(containsNull, builder.build()), relDataType);
+ }
+
+ private static Object deserializeLiteralValue(
Review comment:
Annotate return type with `@Nullable` to make it clear that this also
handles null literals
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
##########
@@ -172,9 +172,10 @@ private boolean isLookupEnabled(CatalogPlanRestore
planRestoreOption) {
static ValidationException missingIdentifier() {
return new ValidationException(
String.format(
- "The table cannot be deserialized, as no identifier is
present within the JSON, "
- + "but lookup is forced by '%s' == '%s'. "
- + "Either allow restoring table from the
catalog with '%s' == '%s' | '%s' or make sure you don't use anonymous tables
when generating the plan.",
+ "The table cannot be deserialized as no identifier is
present in the persisted plan."
Review comment:
All the messages here should be tested
https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java#L336
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
##########
@@ -335,34 +359,185 @@ private void serialize(RexCall call, JsonGenerator gen,
SerializerProvider seria
gen.writeEndObject();
}
- private void serialize(SqlOperator operator, JsonGenerator gen) throws
IOException {
- gen.writeFieldName(FIELD_NAME_OPERATOR);
- gen.writeStartObject();
- gen.writeStringField(FIELD_NAME_NAME, operator.getName());
- gen.writeStringField(FIELD_NAME_KIND, operator.kind.name());
- gen.writeStringField(FIELD_NAME_SYNTAX, operator.getSyntax().name());
- // TODO if a udf is registered with class name, class name is recorded
enough
- if (operator instanceof ScalarSqlFunction) {
- ScalarSqlFunction scalarSqlFunc = (ScalarSqlFunction) operator;
- gen.writeStringField(FIELD_NAME_DISPLAY_NAME,
scalarSqlFunc.displayName());
- gen.writeStringField(FIELD_NAME_FUNCTION_KIND,
FunctionKind.SCALAR.name());
+ private static void serializeSqlOperator(
+ SqlOperator operator,
+ JsonGenerator gen,
+ SerializerProvider serializerProvider,
+ boolean serializeCatalogObjects)
+ throws IOException {
+ if (operator.getSyntax() != SqlSyntax.FUNCTION) {
gen.writeStringField(
- FIELD_NAME_INSTANCE,
-
EncodingUtils.encodeObjectToString(scalarSqlFunc.scalarFunction()));
- } else if (operator instanceof BridgingSqlFunction) {
- BridgingSqlFunction bridgingSqlFunc = (BridgingSqlFunction)
operator;
- FunctionDefinition functionDefinition =
bridgingSqlFunc.getDefinition();
- if (functionDefinition instanceof BuiltInFunctionDefinition) {
- // just record the flag, we can find it by name
- gen.writeBooleanField(FIELD_NAME_BUILT_IN, true);
- } else {
- gen.writeStringField(FIELD_NAME_FUNCTION_KIND,
functionDefinition.getKind().name());
- gen.writeStringField(
- FIELD_NAME_INSTANCE,
-
EncodingUtils.encodeObjectToString(functionDefinition));
- gen.writeBooleanField(FIELD_NAME_BRIDGING, true);
- }
+ FIELD_NAME_SYNTAX,
calciteToSerializable(operator.getSyntax()).getValue());
}
- gen.writeEndObject();
+ if (operator instanceof BridgingSqlFunction) {
+ final BridgingSqlFunction function = (BridgingSqlFunction)
operator;
+ serializeBridgingSqlFunction(
+ function.getName(),
+ function.getResolvedFunction(),
+ gen,
+ serializerProvider,
+ serializeCatalogObjects);
+ } else if (operator instanceof BridgingSqlAggFunction) {
+ final BridgingSqlAggFunction function = (BridgingSqlAggFunction)
operator;
+ serializeBridgingSqlFunction(
+ function.getName(),
+ function.getResolvedFunction(),
+ gen,
+ serializerProvider,
+ serializeCatalogObjects);
+ } else if (operator instanceof ScalarSqlFunction
+ || operator instanceof TableSqlFunction
+ || operator instanceof AggSqlFunction) {
+ throw legacyException(operator.toString());
+ } else {
+ // We assume that all regular SqlOperators are internal. Only the
function definitions
+ // stack is exposed to the user and can thus be external.
+ gen.writeStringField(
+ FIELD_NAME_INTERNAL_NAME,
BuiltInSqlOperator.toQualifiedName(operator));
+ }
+ }
+
+ private static void serializeBridgingSqlFunction(
+ String summaryName,
+ ContextResolvedFunction resolvedFunction,
+ JsonGenerator gen,
+ SerializerProvider serializerProvider,
+ boolean serializeCatalogObjects)
+ throws IOException {
+ final FunctionDefinition definition = resolvedFunction.getDefinition();
+ if (definition instanceof ScalarFunctionDefinition
+ || definition instanceof TableFunctionDefinition
+ || definition instanceof TableAggregateFunctionDefinition
+ || definition instanceof AggregateFunctionDefinition) {
+ throw legacyException(summaryName);
+ }
+
+ if (definition instanceof BuiltInFunctionDefinition) {
+ final BuiltInFunctionDefinition builtInFunction =
+ (BuiltInFunctionDefinition) definition;
+ gen.writeStringField(FIELD_NAME_INTERNAL_NAME,
builtInFunction.getQualifiedName());
+ } else if (resolvedFunction.isAnonymous()) {
+ serializeInlineFunction(summaryName, definition, gen);
+ } else if (resolvedFunction.isTemporary()) {
+ serializeTemporaryFunction(resolvedFunction, gen,
serializerProvider);
+ } else {
+ assert resolvedFunction.isPermanent();
+ serializePermanentFunction(
+ resolvedFunction, gen, serializerProvider,
serializeCatalogObjects);
+ }
+ }
+
+ private static void serializeInlineFunction(
+ String summaryName, FunctionDefinition definition, JsonGenerator
gen)
+ throws IOException {
+ if (!(definition instanceof UserDefinedFunction)
+ || !isClassNameSerializable((UserDefinedFunction) definition))
{
+ throw cannotSerializeInlineFunction(summaryName);
+ }
+ gen.writeStringField(FIELD_NAME_CLASS,
definition.getClass().getName());
+ }
+
+ private static void serializeTemporaryFunction(
+ ContextResolvedFunction resolvedFunction,
+ JsonGenerator gen,
+ SerializerProvider serializerProvider)
+ throws IOException {
+ final FunctionIdentifier identifier =
+ resolvedFunction
+ .getIdentifier()
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Non-anonymous temporary
function should own a function identifier."));
Review comment:
Because you reached this point, this is supposed to have a temporary?
replace this with an assert or just Option.get?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -220,185 +260,298 @@ private Object toLiteralValue(
return ByteString.ofBase64(valueNode.asText());
case CHAR:
case VARCHAR:
- return SerdeContext.get(ctx)
- .getRexBuilder()
- .makeLiteral(valueNode.asText())
- .getValue();
+ return
serdeContext.getRexBuilder().makeLiteral(valueNode.asText()).getValue();
case SYMBOL:
- JsonNode classNode = literalNode.get(FIELD_NAME_CLASS);
- return serializableToCalcite(
- SerializableSymbol.of(classNode.asText(),
valueNode.asText()));
- case ROW:
- case MULTISET:
- ArrayNode valuesNode = (ArrayNode) valueNode;
- List<RexNode> list = new ArrayList<>();
- for (int i = 0; i < valuesNode.size(); ++i) {
- list.add(deserialize(valuesNode.get(i), codec, ctx));
- }
- return list;
+ final JsonNode symbolNode =
literalNode.required(FIELD_NAME_SYMBOL);
+ final SerializableSymbol symbol =
+ SerializableSymbol.of(symbolNode.asText(),
valueNode.asText());
+ return serializableToCalcite(symbol);
default:
throw new TableException("Unknown literal: " + valueNode);
}
}
- @SuppressWarnings({"rawtypes", "unchecked", "UnstableApiUsage"})
- private Sarg<?> toSarg(
- JsonNode jsonNode,
- SqlTypeName sqlTypeName,
- ObjectCodec codec,
- DeserializationContext ctx)
+ private static RexNode deserializeFieldAccess(JsonNode jsonNode,
SerdeContext serdeContext)
throws IOException {
- ArrayNode rangesNode = (ArrayNode) jsonNode.get(FIELD_NAME_RANGES);
- com.google.common.collect.ImmutableRangeSet.Builder builder =
- com.google.common.collect.ImmutableRangeSet.builder();
- for (JsonNode rangeNode : rangesNode) {
- com.google.common.collect.Range<?> range =
com.google.common.collect.Range.all();
- if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
- JsonNode lowerNode = rangeNode.get(FIELD_NAME_BOUND_LOWER);
- Comparable<?> boundValue =
- checkNotNull(
- (Comparable<?>) toLiteralValue(lowerNode,
sqlTypeName, codec, ctx));
- com.google.common.collect.BoundType boundType =
- com.google.common.collect.BoundType.valueOf(
-
lowerNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
- com.google.common.collect.Range r =
- boundType == com.google.common.collect.BoundType.OPEN
- ?
com.google.common.collect.Range.greaterThan(boundValue)
- :
com.google.common.collect.Range.atLeast(boundValue);
- range = range.intersection(r);
- }
- if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
- JsonNode upperNode = rangeNode.get(FIELD_NAME_BOUND_UPPER);
- Comparable<?> boundValue =
- checkNotNull(
- (Comparable<?>) toLiteralValue(upperNode,
sqlTypeName, codec, ctx));
- com.google.common.collect.BoundType boundType =
- com.google.common.collect.BoundType.valueOf(
-
upperNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
- com.google.common.collect.Range r =
- boundType == com.google.common.collect.BoundType.OPEN
- ?
com.google.common.collect.Range.lessThan(boundValue)
- :
com.google.common.collect.Range.atMost(boundValue);
- range = range.intersection(r);
- }
- if (range.hasUpperBound() || range.hasLowerBound()) {
- builder.add(range);
- }
- }
- boolean containsNull =
jsonNode.get(FIELD_NAME_CONTAINS_NULL).booleanValue();
- return Sarg.of(containsNull, builder.build());
+ final String fieldName = jsonNode.required(FIELD_NAME_NAME).asText();
+ final JsonNode exprNode = jsonNode.required(FIELD_NAME_EXPR);
+ final RexNode refExpr = deserialize(exprNode, serdeContext);
Review comment:
Just as a brain exercise, when **serializing**, is it ever possible that
we have some circular rex node graph?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -106,78 +118,108 @@ public RexNodeJsonDeserializer() {
@Override
public RexNode deserialize(JsonParser jsonParser, DeserializationContext
ctx)
throws IOException {
- JsonNode jsonNode = jsonParser.readValueAsTree();
- return deserialize(jsonNode, jsonParser.getCodec(), ctx);
+ final JsonNode jsonNode = jsonParser.readValueAsTree();
+ final SerdeContext serdeContext = SerdeContext.get(ctx);
+ return deserialize(jsonNode, serdeContext);
}
- private RexNode deserialize(JsonNode jsonNode, ObjectCodec codec,
DeserializationContext ctx)
+ private static RexNode deserialize(JsonNode jsonNode, SerdeContext
serdeContext)
throws IOException {
- String kind = jsonNode.get(FIELD_NAME_KIND).asText().toUpperCase();
+ final String kind = jsonNode.required(FIELD_NAME_KIND).asText();
switch (kind) {
- case SQL_KIND_INPUT_REF:
- return deserializeInputRef(jsonNode, codec, ctx);
- case SQL_KIND_LITERAL:
- return deserializeLiteral(jsonNode, codec, ctx);
- case SQL_KIND_FIELD_ACCESS:
- return deserializeFieldAccess(jsonNode, codec, ctx);
- case SQL_KIND_CORREL_VARIABLE:
- return deserializeCorrelVariable(jsonNode, codec, ctx);
- case SQL_KIND_REX_CALL:
- return deserializeCall(jsonNode, codec, ctx);
- case SQL_KIND_PATTERN_INPUT_REF:
- return deserializePatternInputRef(jsonNode, codec, ctx);
+ case KIND_INPUT_REF:
+ return deserializeInputRef(jsonNode, serdeContext);
+ case KIND_LITERAL:
+ return deserializeLiteral(jsonNode, serdeContext);
+ case KIND_FIELD_ACCESS:
+ return deserializeFieldAccess(jsonNode, serdeContext);
+ case KIND_CORREL_VARIABLE:
+ return deserializeCorrelVariable(jsonNode, serdeContext);
+ case KIND_PATTERN_INPUT_REF:
+ return deserializePatternFieldRef(jsonNode, serdeContext);
+ case KIND_CALL:
+ return deserializeCall(jsonNode, serdeContext);
default:
throw new TableException("Cannot convert to RexNode: " +
jsonNode.toPrettyString());
}
}
- private RexNode deserializeInputRef(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType fieldType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
- return SerdeContext.get(ctx).getRexBuilder().makeInputRef(fieldType,
inputIndex);
+ private static RexNode deserializeInputRef(JsonNode jsonNode, SerdeContext
serdeContext) {
+ final int inputIndex =
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+ final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+ final RelDataType fieldType =
+ RelDataTypeJsonDeserializer.deserialize(logicalTypeNode,
serdeContext);
+ return serdeContext.getRexBuilder().makeInputRef(fieldType,
inputIndex);
}
- private RexNode deserializePatternInputRef(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
- String alpha = jsonNode.get(FIELD_NAME_ALPHA).asText();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType fieldType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
- return SerdeContext.get(ctx)
- .getRexBuilder()
- .makePatternFieldRef(alpha, fieldType, inputIndex);
- }
-
- private RexNode deserializeLiteral(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType literalType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
+ private static RexNode deserializeLiteral(JsonNode jsonNode, SerdeContext
serdeContext) {
+ final JsonNode logicalTypeNode = jsonNode.get(FIELD_NAME_TYPE);
Review comment:
Isn't this required?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -106,78 +118,108 @@ public RexNodeJsonDeserializer() {
@Override
public RexNode deserialize(JsonParser jsonParser, DeserializationContext
ctx)
throws IOException {
- JsonNode jsonNode = jsonParser.readValueAsTree();
- return deserialize(jsonNode, jsonParser.getCodec(), ctx);
+ final JsonNode jsonNode = jsonParser.readValueAsTree();
+ final SerdeContext serdeContext = SerdeContext.get(ctx);
+ return deserialize(jsonNode, serdeContext);
}
- private RexNode deserialize(JsonNode jsonNode, ObjectCodec codec,
DeserializationContext ctx)
+ private static RexNode deserialize(JsonNode jsonNode, SerdeContext
serdeContext)
throws IOException {
- String kind = jsonNode.get(FIELD_NAME_KIND).asText().toUpperCase();
+ final String kind = jsonNode.required(FIELD_NAME_KIND).asText();
switch (kind) {
- case SQL_KIND_INPUT_REF:
- return deserializeInputRef(jsonNode, codec, ctx);
- case SQL_KIND_LITERAL:
- return deserializeLiteral(jsonNode, codec, ctx);
- case SQL_KIND_FIELD_ACCESS:
- return deserializeFieldAccess(jsonNode, codec, ctx);
- case SQL_KIND_CORREL_VARIABLE:
- return deserializeCorrelVariable(jsonNode, codec, ctx);
- case SQL_KIND_REX_CALL:
- return deserializeCall(jsonNode, codec, ctx);
- case SQL_KIND_PATTERN_INPUT_REF:
- return deserializePatternInputRef(jsonNode, codec, ctx);
+ case KIND_INPUT_REF:
+ return deserializeInputRef(jsonNode, serdeContext);
+ case KIND_LITERAL:
+ return deserializeLiteral(jsonNode, serdeContext);
+ case KIND_FIELD_ACCESS:
+ return deserializeFieldAccess(jsonNode, serdeContext);
+ case KIND_CORREL_VARIABLE:
+ return deserializeCorrelVariable(jsonNode, serdeContext);
+ case KIND_PATTERN_INPUT_REF:
+ return deserializePatternFieldRef(jsonNode, serdeContext);
+ case KIND_CALL:
+ return deserializeCall(jsonNode, serdeContext);
default:
throw new TableException("Cannot convert to RexNode: " +
jsonNode.toPrettyString());
}
}
- private RexNode deserializeInputRef(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType fieldType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
- return SerdeContext.get(ctx).getRexBuilder().makeInputRef(fieldType,
inputIndex);
+ private static RexNode deserializeInputRef(JsonNode jsonNode, SerdeContext
serdeContext) {
+ final int inputIndex =
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+ final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+ final RelDataType fieldType =
+ RelDataTypeJsonDeserializer.deserialize(logicalTypeNode,
serdeContext);
+ return serdeContext.getRexBuilder().makeInputRef(fieldType,
inputIndex);
}
- private RexNode deserializePatternInputRef(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
- String alpha = jsonNode.get(FIELD_NAME_ALPHA).asText();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType fieldType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
- return SerdeContext.get(ctx)
- .getRexBuilder()
- .makePatternFieldRef(alpha, fieldType, inputIndex);
- }
-
- private RexNode deserializeLiteral(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType literalType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
+ private static RexNode deserializeLiteral(JsonNode jsonNode, SerdeContext
serdeContext) {
+ final JsonNode logicalTypeNode = jsonNode.get(FIELD_NAME_TYPE);
+ final RelDataType relDataType =
+ RelDataTypeJsonDeserializer.deserialize(logicalTypeNode,
serdeContext);
if (jsonNode.has(FIELD_NAME_SARG)) {
- Sarg<?> sarg =
- toSarg(jsonNode.get(FIELD_NAME_SARG),
literalType.getSqlTypeName(), codec, ctx);
- return rexBuilder.makeSearchArgumentLiteral(sarg, literalType);
+ return deserializeSarg(jsonNode.required(FIELD_NAME_SARG),
relDataType, serdeContext);
} else if (jsonNode.has(FIELD_NAME_VALUE)) {
- JsonNode literalNode = jsonNode.get(FIELD_NAME_VALUE);
- if (literalNode.isNull()) {
- return rexBuilder.makeNullLiteral(literalType);
+ final Object value =
+ deserializeLiteralValue(jsonNode,
relDataType.getSqlTypeName(), serdeContext);
+ if (value == null) {
+ return
serdeContext.getRexBuilder().makeNullLiteral(relDataType);
}
- Object literal = toLiteralValue(jsonNode,
literalType.getSqlTypeName(), codec, ctx);
- return rexBuilder.makeLiteral(literal, literalType, true);
+ return serdeContext.getRexBuilder().makeLiteral(value,
relDataType, true);
} else {
throw new TableException("Unknown literal: " +
jsonNode.toPrettyString());
}
}
- private Object toLiteralValue(
- JsonNode literalNode,
- SqlTypeName sqlTypeName,
- ObjectCodec codec,
- DeserializationContext ctx)
- throws IOException {
- JsonNode valueNode = literalNode.get(FIELD_NAME_VALUE);
+ @SuppressWarnings({"UnstableApiUsage", "rawtypes", "unchecked"})
+ private static RexNode deserializeSarg(
+ JsonNode sargNode, RelDataType relDataType, SerdeContext
serdeContext) {
+ final RexBuilder rexBuilder = serdeContext.getRexBuilder();
+ final ArrayNode rangesNode = (ArrayNode)
sargNode.required(FIELD_NAME_RANGES);
+ final Builder builder = builder();
+ for (JsonNode rangeNode : rangesNode) {
+ Range range = all();
+ if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
+ final JsonNode lowerNode =
rangeNode.required(FIELD_NAME_BOUND_LOWER);
+ final Comparable<?> boundValue =
+ (Comparable<?>)
+ deserializeLiteralValue(
+ lowerNode,
relDataType.getSqlTypeName(), serdeContext);
+ assert boundValue != null;
+ final BoundType boundType =
+ serializableToCalcite(
+ BoundType.class,
+
lowerNode.required(FIELD_NAME_BOUND_TYPE).asText());
+ final Range<?> r =
+ boundType == BoundType.OPEN ? greaterThan(boundValue)
: atLeast(boundValue);
+ range = range.intersection(r);
+ }
+ if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
+ final JsonNode upperNode =
rangeNode.required(FIELD_NAME_BOUND_UPPER);
+ final Comparable<?> boundValue =
+ (Comparable<?>)
+ deserializeLiteralValue(
+ upperNode,
relDataType.getSqlTypeName(), serdeContext);
+ assert boundValue != null;
+ final BoundType boundType =
+ serializableToCalcite(
+ BoundType.class,
+
upperNode.required(FIELD_NAME_BOUND_TYPE).asText());
+ final Range<?> r =
+ boundType == BoundType.OPEN ? lessThan(boundValue) :
atMost(boundValue);
+ range = range.intersection(r);
+ }
+ if (range.hasUpperBound() || range.hasLowerBound()) {
+ builder.add(range);
+ }
+ }
+ final boolean containsNull =
sargNode.required(FIELD_NAME_CONTAINS_NULL).booleanValue();
+ return rexBuilder.makeSearchArgumentLiteral(
+ Sarg.of(containsNull, builder.build()), relDataType);
+ }
+
+ private static Object deserializeLiteralValue(
+ JsonNode literalNode, SqlTypeName sqlTypeName, SerdeContext
serdeContext) {
Review comment:
Please take the `literalValue` `JsonNode` here, rather than extracting
it within this function, to make it consistent with all other our deser
functions and make clear what this function is doing.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
##########
@@ -18,58 +18,485 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import
org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.module.Module;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.collect.BoundType;
-import com.google.common.collect.ImmutableRangeSet;
-import com.google.common.collect.Range;
-import com.google.common.collect.TreeRangeSet;
-import org.apache.calcite.avatica.util.ByteString;
-import org.apache.calcite.avatica.util.TimeUnit;
-import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.StructKind;
import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.DateString;
-import org.apache.calcite.util.Sarg;
-import org.apache.calcite.util.TimeString;
-import org.apache.calcite.util.TimestampString;
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
import java.util.stream.Stream;
-import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.configuredSerdeContext;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.assertThatJsonContains;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.assertThatJsonDoesNotContain;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.testJsonRoundTrip;
import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toJson;
-import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toObject;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectReader;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectWriter;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.getObjectMapper;
+import static
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_CLASS;
+import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
+import static
org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for serialization/deserialization of {@link RexNode}. */
public class RexNodeSerdeTest {
private static final FlinkTypeFactory FACTORY =
FlinkTypeFactory.INSTANCE();
+ private static final String FUNCTION_NAME = "MyFunc";
+ private static final FunctionIdentifier FUNCTION_SYS_ID =
FunctionIdentifier.of(FUNCTION_NAME);
+ private static final FunctionIdentifier FUNCTION_CAT_ID =
+ FunctionIdentifier.of(
+ ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE,
FUNCTION_NAME));
+ private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID =
+ UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList());
+ private static final SerializableScalarFunction SER_UDF_IMPL = new
SerializableScalarFunction();
+ private static final Class<SerializableScalarFunction> SER_UDF_CLASS =
+ SerializableScalarFunction.class;
+ private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER =
+ new OtherSerializableScalarFunction();
+ private static final Class<OtherSerializableScalarFunction>
SER_UDF_CLASS_OTHER =
+ OtherSerializableScalarFunction.class;
+ private static final NonSerializableScalarFunction NON_SER_UDF_IMPL =
+ new NonSerializableScalarFunction(true);
+ private static final NonSerializableFunctionDefinition
NON_SER_FUNCTION_DEF_IMPL =
+ new NonSerializableFunctionDefinition();
+ private static final ContextResolvedFunction PERMANENT_FUNCTION =
+ ContextResolvedFunction.permanent(FUNCTION_CAT_ID, SER_UDF_IMPL);
@ParameterizedTest
@MethodSource("testRexNodeSerde")
public void testRexNodeSerde(RexNode rexNode) throws IOException {
- final SerdeContext serdeContext = configuredSerdeContext();
- final String json = toJson(serdeContext, rexNode);
- final RexNode actual = toObject(serdeContext, json, RexNode.class);
-
+ final RexNode actual = testJsonRoundTrip(rexNode, RexNode.class);
assertThat(actual).isEqualTo(rexNode);
}
+ @Test
+ public void testInlineFunction() throws IOException {
+ final SerdeContext serdeContext = contradictingSerdeContext();
+
+ // Serializable function
+ testJsonRoundTrip(
+ createFunctionCall(serdeContext,
ContextResolvedFunction.anonymous(SER_UDF_IMPL)),
+ RexNode.class);
+
+ // Non-serializable function due to fields
+ assertThatThrownBy(
+ () ->
+ toJson(
+ serdeContext,
+ createFunctionCall(
+ serdeContext,
+
ContextResolvedFunction.anonymous(
+ NON_SER_UDF_IMPL))))
+ .satisfies(
+ anyCauseMatches(
+ TableException.class,
+ "The function's implementation class must not
be stateful"));
+ }
+
+ @Test
+ public void testSystemFunction() throws Throwable {
+ final SerdeContext serdeContext = contradictingSerdeContext();
+
+ final ThrowingCallable callable =
+ () ->
+ testJsonRoundTrip(
+ serdeContext,
+ createFunctionCall(
+ serdeContext,
+ ContextResolvedFunction.permanent(
+ FUNCTION_SYS_ID,
NON_SER_UDF_IMPL)),
+ RexNode.class);
+
+ // Missing function
+ assertThatThrownBy(callable)
+ .satisfies(
+ anyCauseMatches(
+ TableException.class,
+ "Could not lookup system function '" +
FUNCTION_NAME + "'."));
+
+ // Module provided permanent function
+ serdeContext
+ .getFlinkContext()
+ .getModuleManager()
+ .loadModule("myModule", FunctionProvidingModule.INSTANCE);
+ callable.call();
+ }
+
+ @Test
+ public void testTemporarySystemFunction() throws Throwable {
+ final SerdeContext serdeContext = contradictingSerdeContext();
+
+ final ThrowingCallable callable =
+ () ->
+ testJsonRoundTrip(
+ serdeContext,
+ createFunctionCall(
+ serdeContext,
+ ContextResolvedFunction.temporary(
+ FUNCTION_SYS_ID,
NON_SER_UDF_IMPL)),
+ RexNode.class);
+
+ // Missing function
+ assertThatThrownBy(callable)
+ .satisfies(
+ anyCauseMatches(
+ TableException.class,
+ "Could not lookup system function '" +
FUNCTION_NAME + "'."));
+
+ // Registered temporary function
+ registerTemporaryFunction(serdeContext);
+ callable.call();
+ }
+
+ @Test
+ public void testTemporaryCatalogFunction() throws Throwable {
+ final SerdeContext serdeContext = contradictingSerdeContext();
+
+ final ThrowingCallable callable =
+ () ->
+ testJsonRoundTrip(
+ serdeContext,
+ createFunctionCall(
+ serdeContext,
+ ContextResolvedFunction.temporary(
+ FUNCTION_CAT_ID,
NON_SER_FUNCTION_DEF_IMPL)),
+ RexNode.class);
+
+ // Missing function
+ assertThatThrownBy(callable)
+ .satisfies(
+ anyCauseMatches(
+ TableException.class,
+ "The persisted plan does not include all
required "
+ + "catalog metadata for function '"
+ + FUNCTION_CAT_ID.asSummaryString()
+ + "'."));
+
+ // Registered temporary function
+ registerTemporaryFunction(serdeContext);
+ callable.call();
+ }
+
+ @Nested
+ @DisplayName("Test CatalogPlanCompilation == IDENTIFIER")
+ class TestCompileIdentifier {
+
+ private final CatalogPlanCompilation compilation =
CatalogPlanCompilation.IDENTIFIER;
+
+ @Nested
+ @DisplayName("and CatalogPlanRestore == IDENTIFIER")
+ class TestRestoreIdentifier {
+
+ private final CatalogPlanRestore restore =
CatalogPlanRestore.IDENTIFIER;
Review comment:
maybe avoid recreating the ctx everytime but just create one here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]