twalthr commented on a change in pull request #18858:
URL: https://github.com/apache/flink/pull/18858#discussion_r811703667
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -220,185 +260,298 @@ private Object toLiteralValue(
return ByteString.ofBase64(valueNode.asText());
case CHAR:
case VARCHAR:
- return SerdeContext.get(ctx)
- .getRexBuilder()
- .makeLiteral(valueNode.asText())
- .getValue();
+ return
serdeContext.getRexBuilder().makeLiteral(valueNode.asText()).getValue();
case SYMBOL:
- JsonNode classNode = literalNode.get(FIELD_NAME_CLASS);
- return serializableToCalcite(
- SerializableSymbol.of(classNode.asText(),
valueNode.asText()));
- case ROW:
- case MULTISET:
- ArrayNode valuesNode = (ArrayNode) valueNode;
- List<RexNode> list = new ArrayList<>();
- for (int i = 0; i < valuesNode.size(); ++i) {
- list.add(deserialize(valuesNode.get(i), codec, ctx));
- }
- return list;
+ final JsonNode symbolNode =
literalNode.required(FIELD_NAME_SYMBOL);
+ final SerializableSymbol symbol =
+ SerializableSymbol.of(symbolNode.asText(),
valueNode.asText());
+ return serializableToCalcite(symbol);
default:
throw new TableException("Unknown literal: " + valueNode);
}
}
- @SuppressWarnings({"rawtypes", "unchecked", "UnstableApiUsage"})
- private Sarg<?> toSarg(
- JsonNode jsonNode,
- SqlTypeName sqlTypeName,
- ObjectCodec codec,
- DeserializationContext ctx)
+ private static RexNode deserializeFieldAccess(JsonNode jsonNode,
SerdeContext serdeContext)
throws IOException {
- ArrayNode rangesNode = (ArrayNode) jsonNode.get(FIELD_NAME_RANGES);
- com.google.common.collect.ImmutableRangeSet.Builder builder =
- com.google.common.collect.ImmutableRangeSet.builder();
- for (JsonNode rangeNode : rangesNode) {
- com.google.common.collect.Range<?> range =
com.google.common.collect.Range.all();
- if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
- JsonNode lowerNode = rangeNode.get(FIELD_NAME_BOUND_LOWER);
- Comparable<?> boundValue =
- checkNotNull(
- (Comparable<?>) toLiteralValue(lowerNode,
sqlTypeName, codec, ctx));
- com.google.common.collect.BoundType boundType =
- com.google.common.collect.BoundType.valueOf(
-
lowerNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
- com.google.common.collect.Range r =
- boundType == com.google.common.collect.BoundType.OPEN
- ?
com.google.common.collect.Range.greaterThan(boundValue)
- :
com.google.common.collect.Range.atLeast(boundValue);
- range = range.intersection(r);
- }
- if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
- JsonNode upperNode = rangeNode.get(FIELD_NAME_BOUND_UPPER);
- Comparable<?> boundValue =
- checkNotNull(
- (Comparable<?>) toLiteralValue(upperNode,
sqlTypeName, codec, ctx));
- com.google.common.collect.BoundType boundType =
- com.google.common.collect.BoundType.valueOf(
-
upperNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
- com.google.common.collect.Range r =
- boundType == com.google.common.collect.BoundType.OPEN
- ?
com.google.common.collect.Range.lessThan(boundValue)
- :
com.google.common.collect.Range.atMost(boundValue);
- range = range.intersection(r);
- }
- if (range.hasUpperBound() || range.hasLowerBound()) {
- builder.add(range);
- }
- }
- boolean containsNull =
jsonNode.get(FIELD_NAME_CONTAINS_NULL).booleanValue();
- return Sarg.of(containsNull, builder.build());
+ final String fieldName = jsonNode.required(FIELD_NAME_NAME).asText();
+ final JsonNode exprNode = jsonNode.required(FIELD_NAME_EXPR);
+ final RexNode refExpr = deserialize(exprNode, serdeContext);
+ return serdeContext.getRexBuilder().makeFieldAccess(refExpr,
fieldName, true);
}
- private RexNode deserializeFieldAccess(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- String fieldName = jsonNode.get(FIELD_NAME_NAME).asText();
- JsonNode exprNode = jsonNode.get(FIELD_NAME_EXPR);
- RexNode refExpr = deserialize(exprNode, codec, ctx);
- return SerdeContext.get(ctx).getRexBuilder().makeFieldAccess(refExpr,
fieldName, true);
+ private static RexNode deserializeCorrelVariable(JsonNode jsonNode,
SerdeContext serdeContext) {
+ final String correl = jsonNode.required(FIELD_NAME_CORREL).asText();
+ final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+ final RelDataType fieldType =
+ RelDataTypeJsonDeserializer.deserialize(logicalTypeNode,
serdeContext);
+ return serdeContext.getRexBuilder().makeCorrel(fieldType, new
CorrelationId(correl));
}
- private RexNode deserializeCorrelVariable(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- String correl = jsonNode.get(FIELD_NAME_CORREL).asText();
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- RelDataType fieldType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
- return SerdeContext.get(ctx)
- .getRexBuilder()
- .makeCorrel(fieldType, new CorrelationId(correl));
+ private static RexNode deserializePatternFieldRef(
+ JsonNode jsonNode, SerdeContext serdeContext) {
+ final int inputIndex =
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+ final String alpha = jsonNode.required(FIELD_NAME_ALPHA).asText();
+ final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+ final RelDataType fieldType =
+ RelDataTypeJsonDeserializer.deserialize(logicalTypeNode,
serdeContext);
+ return serdeContext.getRexBuilder().makePatternFieldRef(alpha,
fieldType, inputIndex);
}
- private RexNode deserializeCall(
- JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx)
throws IOException {
- RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
- SqlOperator operator = toOperator(jsonNode.get(FIELD_NAME_OPERATOR),
SerdeContext.get(ctx));
- ArrayNode operandNodes = (ArrayNode) jsonNode.get(FIELD_NAME_OPERANDS);
- List<RexNode> rexOperands = new ArrayList<>();
+ private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext
serdeContext)
+ throws IOException {
+ final SqlOperator operator = deserializeSqlOperator(jsonNode,
serdeContext);
+ final ArrayNode operandNodes = (ArrayNode)
jsonNode.get(FIELD_NAME_OPERANDS);
+ final List<RexNode> rexOperands = new ArrayList<>();
for (JsonNode node : operandNodes) {
- rexOperands.add(deserialize(node, codec, ctx));
+ rexOperands.add(deserialize(node, serdeContext));
}
final RelDataType callType;
if (jsonNode.has(FIELD_NAME_TYPE)) {
- JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
- callType = ctx.readValue(typeNode.traverse(codec),
RelDataType.class);
+ final JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
+ callType = RelDataTypeJsonDeserializer.deserialize(typeNode,
serdeContext);
} else {
- callType = rexBuilder.deriveReturnType(operator, rexOperands);
+ callType = serdeContext.getRexBuilder().deriveReturnType(operator,
rexOperands);
}
- return rexBuilder.makeCall(callType, operator, rexOperands);
+ return serdeContext.getRexBuilder().makeCall(callType, operator,
rexOperands);
}
- private SqlOperator toOperator(JsonNode jsonNode, SerdeContext ctx) throws
IOException {
- String name = jsonNode.get(FIELD_NAME_NAME).asText();
- SqlKind sqlKind =
SqlKind.valueOf(jsonNode.get(FIELD_NAME_KIND).asText());
- SqlSyntax sqlSyntax =
SqlSyntax.valueOf(jsonNode.get(FIELD_NAME_SYNTAX).asText());
- List<SqlOperator> operators = new ArrayList<>();
- ctx.getOperatorTable()
- .lookupOperatorOverloads(
- new SqlIdentifier(name, new SqlParserPos(0, 0)),
- null, // category
- sqlSyntax,
- operators,
- SqlNameMatchers.liberal());
- for (SqlOperator operator : operators) {
- // in case different operator has the same kind, check with both
name and kind.
- if (operator.kind == sqlKind) {
- return operator;
- }
+ private static SqlOperator deserializeSqlOperator(
+ JsonNode jsonNode, SerdeContext serdeContext) {
+ final SqlSyntax syntax;
+ if (jsonNode.has(FIELD_NAME_SYNTAX)) {
+ syntax =
+ serializableToCalcite(
+ SqlSyntax.class,
jsonNode.required(FIELD_NAME_SYNTAX).asText());
+ } else {
+ syntax = SqlSyntax.FUNCTION;
}
- // try to find operator from std operator table.
- SqlStdOperatorTable.instance()
- .lookupOperatorOverloads(
- new SqlIdentifier(name, new SqlParserPos(0, 0)),
- null, // category
- sqlSyntax,
- operators,
- SqlNameMatchers.liberal());
- for (SqlOperator operator : operators) {
- // in case different operator has the same kind, check with both
name and kind.
- if (operator.kind == sqlKind) {
- return operator;
- }
+ if (jsonNode.has(FIELD_NAME_INTERNAL_NAME)) {
+ return deserializeInternalFunction(
+ jsonNode.required(FIELD_NAME_INTERNAL_NAME).asText(),
syntax, serdeContext);
+ } else if (jsonNode.has(FIELD_NAME_CATALOG_NAME)) {
+ return deserializeCatalogFunction(jsonNode, syntax, serdeContext);
+ } else if (jsonNode.has(FIELD_NAME_CLASS)) {
+ return deserializeFunctionClass(jsonNode, serdeContext);
+ } else if (jsonNode.has(FIELD_NAME_SYSTEM_NAME)) {
+ return deserializeSystemFunction(
+ jsonNode.required(FIELD_NAME_SYSTEM_NAME).asText(),
syntax, serdeContext);
+ } else {
+ throw new TableException("Invalid function call.");
}
+ }
- // built-in function
- // TODO supports other module's built-in function
- if (jsonNode.has(FIELD_NAME_BUILT_IN) &&
jsonNode.get(FIELD_NAME_BUILT_IN).booleanValue()) {
- Optional<FunctionDefinition> function =
CoreModule.INSTANCE.getFunctionDefinition(name);
- Preconditions.checkArgument(function.isPresent());
- return BridgingSqlFunction.of(
- ctx.getFlinkContext(),
- ctx.getTypeFactory(),
-
ContextResolvedFunction.permanent(FunctionIdentifier.of(name), function.get()));
+ private static SqlOperator deserializeSystemFunction(
+ String systemName, SqlSyntax syntax, SerdeContext serdeContext) {
+ // This method covers both temporary system functions and permanent
system
+ // functions from a module
+ final Optional<SqlOperator> systemOperator =
+ lookupOptionalSqlOperator(
+ FunctionIdentifier.of(systemName), syntax,
serdeContext, true);
+ if (systemOperator.isPresent()) {
+ return systemOperator.get();
}
+ throw new TableException(
+ String.format(
+ "Could not lookup system function '%s'. "
+ + "Make sure it has been registered before as
temporary "
+ + "functions are not contained in the
persisted plan. "
+ + "If the function was provided by a module,
make sure to reloaded "
+ + "all used modules in the correct order.",
+ systemName));
+ }
- if (jsonNode.has(FIELD_NAME_FUNCTION_KIND) &&
jsonNode.has(FIELD_NAME_INSTANCE)) {
- FunctionKind functionKind =
- FunctionKind.valueOf(
-
jsonNode.get(FIELD_NAME_FUNCTION_KIND).asText().toUpperCase());
- String instanceStr = jsonNode.get(FIELD_NAME_INSTANCE).asText();
- if (functionKind != FunctionKind.SCALAR) {
- throw new TableException("Unknown function kind: " +
functionKind);
- }
- if (jsonNode.has(FIELD_NAME_BRIDGING)
- && jsonNode.get(FIELD_NAME_BRIDGING).booleanValue()) {
- FunctionDefinition function =
- EncodingUtils.decodeStringToObject(instanceStr,
ctx.getClassLoader());
+ private static SqlOperator deserializeInternalFunction(
+ String internalName, SqlSyntax syntax, SerdeContext serdeContext) {
+ // Try $FUNC$1
+ final Optional<SqlOperator> internalOperator =
+ lookupOptionalSqlOperator(
+ FunctionIdentifier.of(internalName), syntax,
serdeContext, false);
+ if (internalOperator.isPresent()) {
+ return internalOperator.get();
+ }
+ // Try FUNC
+ final String publicName =
BuiltInSqlOperator.extractNameFromQualifiedName(internalName);
+ final Optional<SqlOperator> latestOperator =
+ lookupOptionalSqlOperator(
+ FunctionIdentifier.of(publicName), syntax,
serdeContext, true);
+ if (latestOperator.isPresent()) {
+ return latestOperator.get();
+ }
+ throw new TableException(
+ String.format(
+ "Could not resolve internal system function '%s'. "
+ + "This is a bug, please file an issue.",
+ internalName));
+ }
+
+ private static SqlOperator deserializeFunctionClass(
+ JsonNode jsonNode, SerdeContext serdeContext) {
+ final String className = jsonNode.required(FIELD_NAME_CLASS).asText();
+ final Class<?> functionClass = loadClass(className, serdeContext,
"function");
+ final UserDefinedFunction functionInstance =
+ UserDefinedFunctionHelper.instantiateFunction(functionClass);
+
+ final ContextResolvedFunction resolvedFunction;
+ // This can never be a system function
+ // because we never serialize classes for system functions
+ if (jsonNode.has(FIELD_NAME_CATALOG_NAME)) {
+ final ObjectIdentifier objectIdentifier =
+ ObjectIdentifierJsonDeserializer.deserialize(
+
jsonNode.required(FIELD_NAME_CATALOG_NAME).asText(), serdeContext);
+ resolvedFunction =
+ ContextResolvedFunction.permanent(
+ FunctionIdentifier.of(objectIdentifier),
functionInstance);
+ } else {
+ resolvedFunction =
ContextResolvedFunction.anonymous(functionInstance);
+ }
+
+ switch (functionInstance.getKind()) {
+ case SCALAR:
+ case TABLE:
return BridgingSqlFunction.of(
- ctx.getFlinkContext(),
- ctx.getTypeFactory(),
-
ContextResolvedFunction.permanent(FunctionIdentifier.of(name), function));
- } else {
- String displayName =
jsonNode.get(FIELD_NAME_DISPLAY_NAME).asText();
- ScalarFunction function =
- EncodingUtils.decodeStringToObject(instanceStr,
ctx.getClassLoader());
- return new ScalarSqlFunction(
- FunctionIdentifier.of(name),
- displayName,
- function,
- ctx.getTypeFactory(),
- JavaScalaConversionUtil.toScala(Optional.empty()));
+ serdeContext.getFlinkContext(),
+ serdeContext.getTypeFactory(),
+ resolvedFunction);
+ case AGGREGATE:
+ return BridgingSqlAggFunction.of(
+ serdeContext.getFlinkContext(),
+ serdeContext.getTypeFactory(),
+ resolvedFunction);
+ default:
+ throw new TableException(
+ String.format(
+ "Unsupported anonymous function kind '%s' for
class '%s'.",
+ functionInstance.getKind(), className));
+ }
+ }
+
+ private static SqlOperator deserializeCatalogFunction(
+ JsonNode jsonNode, SqlSyntax syntax, SerdeContext serdeContext) {
+ final CatalogPlanRestore restoreStrategy =
+
serdeContext.getConfiguration().get(PLAN_RESTORE_CATALOG_OBJECTS);
+ final FunctionIdentifier identifier =
+ FunctionIdentifier.of(
+ ObjectIdentifierJsonDeserializer.deserialize(
+
jsonNode.required(FIELD_NAME_CATALOG_NAME).asText(), serdeContext));
+
+ switch (restoreStrategy) {
+ case ALL:
+ {
+ final Optional<SqlOperator> lookupOperator =
+ lookupOptionalSqlOperator(identifier, syntax,
serdeContext, false);
+ if (lookupOperator.isPresent()) {
+ return lookupOperator.get();
+ } else if (jsonNode.has(FIELD_NAME_CLASS)) {
+ return deserializeFunctionClass(jsonNode,
serdeContext);
+ }
+ throw missingFunctionFromCatalog(identifier, false);
+ }
+ case ALL_ENFORCED:
+ {
+ if (jsonNode.has(FIELD_NAME_CLASS)) {
+ return deserializeFunctionClass(jsonNode,
serdeContext);
+ }
+ final Optional<SqlOperator> lookupOperator =
+ lookupOptionalSqlOperator(identifier, syntax,
serdeContext, false);
+ if
(lookupOperator.map(RexNodeJsonDeserializer::isTemporary).orElse(false)) {
+ return lookupOperator.get();
+ }
+ throw lookupDisabled(identifier);
+ }
+ case IDENTIFIER:
+ final Optional<SqlOperator> lookupOperator =
+ lookupOptionalSqlOperator(identifier, syntax,
serdeContext, true);
+ if (lookupOperator.isPresent()) {
+ return lookupOperator.get();
+ } else {
+ throw missingFunctionFromCatalog(identifier, true);
+ }
+ default:
+ throw new TableException("Unsupported restore strategy: " +
restoreStrategy);
+ }
+ }
+
+ private static boolean isTemporary(SqlOperator sqlOperator) {
+ if (sqlOperator instanceof BridgingSqlFunction) {
+ return ((BridgingSqlFunction)
sqlOperator).getResolvedFunction().isTemporary();
+ } else if (sqlOperator instanceof BridgingSqlAggFunction) {
+ return ((BridgingSqlAggFunction)
sqlOperator).getResolvedFunction().isTemporary();
+ }
+ return false;
+ }
+
+ private static Optional<SqlOperator> lookupOptionalSqlOperator(
+ FunctionIdentifier identifier,
+ SqlSyntax syntax,
+ SerdeContext serdeContext,
+ boolean throwOnError) {
+ final List<SqlOperator> foundOperators = new ArrayList<>();
+ try {
+ serdeContext
+ .getOperatorTable()
+ .lookupOperatorOverloads(
+ new SqlIdentifier(identifier.toList(), new
SqlParserPos(0, 0)),
+ null, // category
+ syntax,
+ foundOperators,
+ SqlNameMatchers.liberal());
+ if (foundOperators.size() != 1) {
+ return Optional.empty();
+ }
+ return Optional.of(foundOperators.get(0));
+ } catch (Throwable t) {
+ if (throwOnError) {
+ throw new TableException(
+ String.format("Error during lookup of function '%s'.",
identifier), t);
}
+ return Optional.empty();
+ }
+ }
+
+ private static TableException lookupDisabled(FunctionIdentifier
identifier) {
Review comment:
tested in
`TestRestoreIdentifier.TestRestoreAllEnforced.withConstantCatalogFunction`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]