slinkydeveloper commented on a change in pull request #18858:
URL: https://github.com/apache/flink/pull/18858#discussion_r811879731



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
##########
@@ -42,7 +44,10 @@ import scala.collection.JavaConverters._
   * @param displayName    name to be displayed in operator name
   * @param scalarFunction scalar function to be called
   * @param typeFactory    type factory for converting Flink's between 
Calcite's types
+  * @deprecated Use [[BuiltInFunctionDefinitions]] that translates to 
[[BridgingSqlFunction]].
   */
+@Deprecated
+@deprecated

Review comment:
       2 deprecated annotations?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
##########
@@ -56,44 +42,42 @@
 import org.apache.calcite.util.Sarg;
 import org.apache.calcite.util.TimeString;
 import org.apache.calcite.util.TimestampString;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.Random;
+import java.util.stream.Stream;
 
-import static org.junit.Assert.assertEquals;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.configuredSerdeContext;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toJson;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toObject;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for serialization/deserialization of {@link RexNode}. */
-@RunWith(Parameterized.class)
 public class RexNodeSerdeTest {

Review comment:
       Have you tried to use `@Execution(CONCURRENT)`?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
##########
@@ -177,8 +177,9 @@ public String toString() {
     // Shared with BuiltInSqlOperator and BuiltInSqlFunction in planner
     // 
--------------------------------------------------------------------------------------------
 
+    // note that for SQL operators the name can contain spaces and dollar signs
     private static final Pattern INTERNAL_NAME_PATTERN =
-            Pattern.compile("\\$[A-Z0-9_]+\\$[1-9][0-9]*");
+            Pattern.compile("\\$[A-Z0-9_ $]+\\$[1-9][0-9]*");

Review comment:
       rather than space use `\s`

##########
File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
##########
@@ -33,7 +35,15 @@
     public static final String DEFAULT_DATABASE = 
EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
 
     public static CatalogManager createEmptyCatalogManager() {
-        final CatalogManager catalogManager = preparedCatalogManager().build();
+        return createEmptyCatalogManager(null);
+    }
+
+    public static CatalogManager createEmptyCatalogManager(@Nullable Catalog 
catalog) {

Review comment:
       If it has a catalog, then it's not empty anymore? Maybe rename with 
`createCatalogManager`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/BuiltInSqlOperator.java
##########
@@ -74,4 +74,16 @@ static String toQualifiedName(SqlOperator operator) {
         }
         return qualifyFunctionName(operator.getName(), DEFAULT_VERSION);
     }
+
+    static String extractNameFromQualifiedName(String qualifiedName) {
+        // supports all various kinds of qualified names
+        // $FUNC$1 => FUNC
+        // $IS NULL$1 => IS NULL
+        // $$CALCITE_INTERNAL$1 => $CALCITE_INTERNAL
+        int versionPos = qualifiedName.length() - 1;
+        while (Character.isDigit(qualifiedName.charAt(versionPos))) {
+            versionPos--;
+        }
+        return qualifiedName.substring(1, versionPos);
+    }

Review comment:
       I'm not much of a fan of this parsing logic, as it breaks with all 
various string corner cases, e.g. UTF-16 strings (think about a name in chinese 
of a function name). Just looking at this commit, I don't understand where this 
function is used. Is it used only for built-ins? Or some UDFs can go through 
this? If that's the case, I suggest to replace this logic with code points 
handling, otherwise ignore.
   
   In any case, I think it's nicer to read if you match instead on the `'$'` 
character for looping, rather than checking the digit

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -220,185 +260,298 @@ private Object toLiteralValue(
                 return ByteString.ofBase64(valueNode.asText());
             case CHAR:
             case VARCHAR:
-                return SerdeContext.get(ctx)
-                        .getRexBuilder()
-                        .makeLiteral(valueNode.asText())
-                        .getValue();
+                return 
serdeContext.getRexBuilder().makeLiteral(valueNode.asText()).getValue();
             case SYMBOL:
-                JsonNode classNode = literalNode.get(FIELD_NAME_CLASS);
-                return serializableToCalcite(
-                        SerializableSymbol.of(classNode.asText(), 
valueNode.asText()));
-            case ROW:
-            case MULTISET:
-                ArrayNode valuesNode = (ArrayNode) valueNode;
-                List<RexNode> list = new ArrayList<>();
-                for (int i = 0; i < valuesNode.size(); ++i) {
-                    list.add(deserialize(valuesNode.get(i), codec, ctx));
-                }
-                return list;
+                final JsonNode symbolNode = 
literalNode.required(FIELD_NAME_SYMBOL);
+                final SerializableSymbol symbol =
+                        SerializableSymbol.of(symbolNode.asText(), 
valueNode.asText());
+                return serializableToCalcite(symbol);
             default:
                 throw new TableException("Unknown literal: " + valueNode);
         }
     }
 
-    @SuppressWarnings({"rawtypes", "unchecked", "UnstableApiUsage"})
-    private Sarg<?> toSarg(
-            JsonNode jsonNode,
-            SqlTypeName sqlTypeName,
-            ObjectCodec codec,
-            DeserializationContext ctx)
+    private static RexNode deserializeFieldAccess(JsonNode jsonNode, 
SerdeContext serdeContext)
             throws IOException {
-        ArrayNode rangesNode = (ArrayNode) jsonNode.get(FIELD_NAME_RANGES);
-        com.google.common.collect.ImmutableRangeSet.Builder builder =
-                com.google.common.collect.ImmutableRangeSet.builder();
-        for (JsonNode rangeNode : rangesNode) {
-            com.google.common.collect.Range<?> range = 
com.google.common.collect.Range.all();
-            if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
-                JsonNode lowerNode = rangeNode.get(FIELD_NAME_BOUND_LOWER);
-                Comparable<?> boundValue =
-                        checkNotNull(
-                                (Comparable<?>) toLiteralValue(lowerNode, 
sqlTypeName, codec, ctx));
-                com.google.common.collect.BoundType boundType =
-                        com.google.common.collect.BoundType.valueOf(
-                                
lowerNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
-                com.google.common.collect.Range r =
-                        boundType == com.google.common.collect.BoundType.OPEN
-                                ? 
com.google.common.collect.Range.greaterThan(boundValue)
-                                : 
com.google.common.collect.Range.atLeast(boundValue);
-                range = range.intersection(r);
-            }
-            if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
-                JsonNode upperNode = rangeNode.get(FIELD_NAME_BOUND_UPPER);
-                Comparable<?> boundValue =
-                        checkNotNull(
-                                (Comparable<?>) toLiteralValue(upperNode, 
sqlTypeName, codec, ctx));
-                com.google.common.collect.BoundType boundType =
-                        com.google.common.collect.BoundType.valueOf(
-                                
upperNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
-                com.google.common.collect.Range r =
-                        boundType == com.google.common.collect.BoundType.OPEN
-                                ? 
com.google.common.collect.Range.lessThan(boundValue)
-                                : 
com.google.common.collect.Range.atMost(boundValue);
-                range = range.intersection(r);
-            }
-            if (range.hasUpperBound() || range.hasLowerBound()) {
-                builder.add(range);
-            }
-        }
-        boolean containsNull = 
jsonNode.get(FIELD_NAME_CONTAINS_NULL).booleanValue();
-        return Sarg.of(containsNull, builder.build());
+        final String fieldName = jsonNode.required(FIELD_NAME_NAME).asText();
+        final JsonNode exprNode = jsonNode.required(FIELD_NAME_EXPR);
+        final RexNode refExpr = deserialize(exprNode, serdeContext);
+        return serdeContext.getRexBuilder().makeFieldAccess(refExpr, 
fieldName, true);
     }
 
-    private RexNode deserializeFieldAccess(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        String fieldName = jsonNode.get(FIELD_NAME_NAME).asText();
-        JsonNode exprNode = jsonNode.get(FIELD_NAME_EXPR);
-        RexNode refExpr = deserialize(exprNode, codec, ctx);
-        return SerdeContext.get(ctx).getRexBuilder().makeFieldAccess(refExpr, 
fieldName, true);
+    private static RexNode deserializeCorrelVariable(JsonNode jsonNode, 
SerdeContext serdeContext) {
+        final String correl = jsonNode.required(FIELD_NAME_CORREL).asText();
+        final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+        final RelDataType fieldType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
+        return serdeContext.getRexBuilder().makeCorrel(fieldType, new 
CorrelationId(correl));
     }
 
-    private RexNode deserializeCorrelVariable(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        String correl = jsonNode.get(FIELD_NAME_CORREL).asText();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType fieldType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
-        return SerdeContext.get(ctx)
-                .getRexBuilder()
-                .makeCorrel(fieldType, new CorrelationId(correl));
+    private static RexNode deserializePatternFieldRef(
+            JsonNode jsonNode, SerdeContext serdeContext) {
+        final int inputIndex = 
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+        final String alpha = jsonNode.required(FIELD_NAME_ALPHA).asText();
+        final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+        final RelDataType fieldType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
+        return serdeContext.getRexBuilder().makePatternFieldRef(alpha, 
fieldType, inputIndex);
     }
 
-    private RexNode deserializeCall(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
-        SqlOperator operator = toOperator(jsonNode.get(FIELD_NAME_OPERATOR), 
SerdeContext.get(ctx));
-        ArrayNode operandNodes = (ArrayNode) jsonNode.get(FIELD_NAME_OPERANDS);
-        List<RexNode> rexOperands = new ArrayList<>();
+    private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext 
serdeContext)
+            throws IOException {
+        final SqlOperator operator = deserializeSqlOperator(jsonNode, 
serdeContext);
+        final ArrayNode operandNodes = (ArrayNode) 
jsonNode.get(FIELD_NAME_OPERANDS);
+        final List<RexNode> rexOperands = new ArrayList<>();
         for (JsonNode node : operandNodes) {
-            rexOperands.add(deserialize(node, codec, ctx));
+            rexOperands.add(deserialize(node, serdeContext));
         }
         final RelDataType callType;
         if (jsonNode.has(FIELD_NAME_TYPE)) {
-            JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-            callType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
+            final JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
+            callType = RelDataTypeJsonDeserializer.deserialize(typeNode, 
serdeContext);
         } else {
-            callType = rexBuilder.deriveReturnType(operator, rexOperands);
+            callType = serdeContext.getRexBuilder().deriveReturnType(operator, 
rexOperands);
         }
-        return rexBuilder.makeCall(callType, operator, rexOperands);
+        return serdeContext.getRexBuilder().makeCall(callType, operator, 
rexOperands);
     }
 
-    private SqlOperator toOperator(JsonNode jsonNode, SerdeContext ctx) throws 
IOException {
-        String name = jsonNode.get(FIELD_NAME_NAME).asText();
-        SqlKind sqlKind = 
SqlKind.valueOf(jsonNode.get(FIELD_NAME_KIND).asText());
-        SqlSyntax sqlSyntax = 
SqlSyntax.valueOf(jsonNode.get(FIELD_NAME_SYNTAX).asText());
-        List<SqlOperator> operators = new ArrayList<>();
-        ctx.getOperatorTable()
-                .lookupOperatorOverloads(
-                        new SqlIdentifier(name, new SqlParserPos(0, 0)),
-                        null, // category
-                        sqlSyntax,
-                        operators,
-                        SqlNameMatchers.liberal());
-        for (SqlOperator operator : operators) {
-            // in case different operator has the same kind, check with both 
name and kind.
-            if (operator.kind == sqlKind) {
-                return operator;
-            }
+    private static SqlOperator deserializeSqlOperator(
+            JsonNode jsonNode, SerdeContext serdeContext) {
+        final SqlSyntax syntax;
+        if (jsonNode.has(FIELD_NAME_SYNTAX)) {
+            syntax =
+                    serializableToCalcite(
+                            SqlSyntax.class, 
jsonNode.required(FIELD_NAME_SYNTAX).asText());
+        } else {
+            syntax = SqlSyntax.FUNCTION;
         }
 
-        // try to find operator from std operator table.
-        SqlStdOperatorTable.instance()
-                .lookupOperatorOverloads(
-                        new SqlIdentifier(name, new SqlParserPos(0, 0)),
-                        null, // category
-                        sqlSyntax,
-                        operators,
-                        SqlNameMatchers.liberal());
-        for (SqlOperator operator : operators) {
-            // in case different operator has the same kind, check with both 
name and kind.
-            if (operator.kind == sqlKind) {
-                return operator;
-            }
+        if (jsonNode.has(FIELD_NAME_INTERNAL_NAME)) {
+            return deserializeInternalFunction(
+                    jsonNode.required(FIELD_NAME_INTERNAL_NAME).asText(), 
syntax, serdeContext);
+        } else if (jsonNode.has(FIELD_NAME_CATALOG_NAME)) {
+            return deserializeCatalogFunction(jsonNode, syntax, serdeContext);
+        } else if (jsonNode.has(FIELD_NAME_CLASS)) {
+            return deserializeFunctionClass(jsonNode, serdeContext);
+        } else if (jsonNode.has(FIELD_NAME_SYSTEM_NAME)) {
+            return deserializeSystemFunction(
+                    jsonNode.required(FIELD_NAME_SYSTEM_NAME).asText(), 
syntax, serdeContext);
+        } else {
+            throw new TableException("Invalid function call.");
         }
+    }
 
-        // built-in function
-        // TODO supports other module's built-in function
-        if (jsonNode.has(FIELD_NAME_BUILT_IN) && 
jsonNode.get(FIELD_NAME_BUILT_IN).booleanValue()) {
-            Optional<FunctionDefinition> function = 
CoreModule.INSTANCE.getFunctionDefinition(name);
-            Preconditions.checkArgument(function.isPresent());
-            return BridgingSqlFunction.of(
-                    ctx.getFlinkContext(),
-                    ctx.getTypeFactory(),
-                    
ContextResolvedFunction.permanent(FunctionIdentifier.of(name), function.get()));
+    private static SqlOperator deserializeSystemFunction(
+            String systemName, SqlSyntax syntax, SerdeContext serdeContext) {
+        // This method covers both temporary system functions and permanent 
system
+        // functions from a module
+        final Optional<SqlOperator> systemOperator =
+                lookupOptionalSqlOperator(
+                        FunctionIdentifier.of(systemName), syntax, 
serdeContext, true);
+        if (systemOperator.isPresent()) {
+            return systemOperator.get();
         }
+        throw new TableException(

Review comment:
       Extract important exceptions in separate methods

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
##########
@@ -92,121 +519,129 @@ public void testRexNodeSerde(RexNode rexNode) throws 
IOException {
                         Arrays.asList("f1", "f2", "f3"));
 
         return Stream.of(
-                
rexBuilder.makeNullLiteral(FACTORY.createSqlType(SqlTypeName.VARCHAR)),
-                rexBuilder.makeLiteral(true),
-                rexBuilder.makeExactLiteral(
-                        new BigDecimal(Byte.MAX_VALUE), 
FACTORY.createSqlType(SqlTypeName.TINYINT)),
-                rexBuilder.makeExactLiteral(
-                        new BigDecimal(Short.MAX_VALUE),
-                        FACTORY.createSqlType(SqlTypeName.SMALLINT)),
-                rexBuilder.makeExactLiteral(
-                        new BigDecimal(Integer.MAX_VALUE),
-                        FACTORY.createSqlType(SqlTypeName.INTEGER)),
-                rexBuilder.makeExactLiteral(
-                        new BigDecimal(Long.MAX_VALUE), 
FACTORY.createSqlType(SqlTypeName.BIGINT)),
-                rexBuilder.makeExactLiteral(
-                        BigDecimal.valueOf(Double.MAX_VALUE),
-                        FACTORY.createSqlType(SqlTypeName.DOUBLE)),
-                rexBuilder.makeApproxLiteral(
-                        BigDecimal.valueOf(Float.MAX_VALUE),
-                        FACTORY.createSqlType(SqlTypeName.FLOAT)),
-                rexBuilder.makeExactLiteral(new 
BigDecimal("23.1234567890123456789012345678")),
-                rexBuilder.makeIntervalLiteral(
-                        BigDecimal.valueOf(100),
-                        new SqlIntervalQualifier(
-                                TimeUnit.YEAR,
-                                4,
-                                TimeUnit.YEAR,
-                                RelDataType.PRECISION_NOT_SPECIFIED,
-                                SqlParserPos.ZERO)),
-                rexBuilder.makeIntervalLiteral(
-                        BigDecimal.valueOf(3),
-                        new SqlIntervalQualifier(
-                                TimeUnit.YEAR,
-                                2,
-                                TimeUnit.MONTH,
-                                RelDataType.PRECISION_NOT_SPECIFIED,
-                                SqlParserPos.ZERO)),
-                rexBuilder.makeIntervalLiteral(
-                        BigDecimal.valueOf(3),
-                        new SqlIntervalQualifier(
-                                TimeUnit.DAY, 2, TimeUnit.SECOND, 6, 
SqlParserPos.ZERO)),
-                rexBuilder.makeIntervalLiteral(
-                        BigDecimal.valueOf(3),
-                        new SqlIntervalQualifier(
-                                TimeUnit.SECOND, 2, TimeUnit.SECOND, 6, 
SqlParserPos.ZERO)),
-                rexBuilder.makeDateLiteral(DateString.fromDaysSinceEpoch(10)),
-                rexBuilder.makeDateLiteral(new DateString("2000-12-12")),
-                rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(1234), 
3),
-                rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(123456), 
6),
-                rexBuilder.makeTimeLiteral(new 
TimeString("01:01:01.000000001"), 9),
-                
rexBuilder.makeTimestampLiteral(TimestampString.fromMillisSinceEpoch(1234), 3),
-                
rexBuilder.makeTimestampLiteral(TimestampString.fromMillisSinceEpoch(123456789),
 9),
-                rexBuilder.makeTimestampLiteral(
-                        new TimestampString("0001-01-01 01:01:01.000000001"), 
9),
-                rexBuilder.makeTimestampLiteral(new 
TimestampString("2000-12-12 12:30:57.1234"), 4),
-                rexBuilder.makeBinaryLiteral(ByteString.EMPTY),
-                
rexBuilder.makeBinaryLiteral(ByteString.ofBase64("SGVsbG8gV29ybGQh")),
-                rexBuilder.makeLiteral(""),
-                rexBuilder.makeLiteral("abc"),
-                rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH),
-                rexBuilder.makeFlag(TimeUnitRange.DAY),
-                rexBuilder.makeLiteral(
-                        Arrays.<Object>asList(1, 2L),
-                        FACTORY.createStructType(
-                                StructKind.PEEK_FIELDS_NO_EXPAND,
-                                Arrays.asList(
-                                        
FACTORY.createSqlType(SqlTypeName.INTEGER),
-                                        
FACTORY.createSqlType(SqlTypeName.BIGINT)),
-                                Arrays.asList("f1", "f2")),
-                        false),
-                rexBuilder.makeSearchArgumentLiteral(
-                        Sarg.of(
-                                false,
-                                ImmutableRangeSet.of(
-                                        Range.closed(
-                                                BigDecimal.valueOf(1), 
BigDecimal.valueOf(10)))),
-                        FACTORY.createSqlType(SqlTypeName.INTEGER)),
-                rexBuilder.makeSearchArgumentLiteral(
-                        Sarg.of(
-                                false,
-                                ImmutableRangeSet.of(
-                                        Range.range(
-                                                BigDecimal.valueOf(1),
-                                                BoundType.OPEN,
-                                                BigDecimal.valueOf(10),
-                                                BoundType.CLOSED))),
-                        FACTORY.createSqlType(SqlTypeName.INTEGER)),
-                rexBuilder.makeSearchArgumentLiteral(
-                        Sarg.of(
-                                false,
-                                TreeRangeSet.create(
-                                        Arrays.asList(
-                                                Range.closed(
-                                                        BigDecimal.valueOf(1),
-                                                        BigDecimal.valueOf(1)),
-                                                Range.closed(
-                                                        BigDecimal.valueOf(3),
-                                                        BigDecimal.valueOf(3)),
-                                                Range.closed(
-                                                        BigDecimal.valueOf(6),
-                                                        
BigDecimal.valueOf(6))))),
-                        FACTORY.createSqlType(SqlTypeName.INTEGER)),
-                
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.BIGINT), 0),
-                rexBuilder.makeCorrel(inputType, new CorrelationId("$cor1")),
-                rexBuilder.makeFieldAccess(
-                        rexBuilder.makeCorrel(inputType, new 
CorrelationId("$cor2")), "f2", true),
-                // cast($1 as smallint)
-                rexBuilder.makeCast(
-                        FACTORY.createSqlType(SqlTypeName.SMALLINT),
-                        
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1)),
-                // $1 in (1, 3, 5)
-                rexBuilder.makeIn(
-                        
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1),
-                        Arrays.asList(
-                                rexBuilder.makeExactLiteral(new BigDecimal(1)),
-                                rexBuilder.makeExactLiteral(new BigDecimal(3)),
-                                rexBuilder.makeExactLiteral(new 
BigDecimal(5)))),
+                //

Review comment:
       Why these comments?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
##########
@@ -185,11 +186,12 @@ static ValidationException missingIdentifier() {
     static ValidationException lookupDisabled(ObjectIdentifier 
objectIdentifier) {
         return new ValidationException(
                 String.format(
-                        "The table '%s' does not contain any '%s' field, "
-                                + "but lookup is disabled because option '%s' 
== '%s'. "
-                                + "Either enable the catalog lookup with '%s' 
== '%s' | '%s' or regenerate the plan with '%s' != '%s'.",
+                        "The persisted plan does not include all required 
catalog metadata for table '%s'. "

Review comment:
       BTW I'm already changing these here: 
https://github.com/apache/flink/pull/18770

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -106,78 +118,108 @@ public RexNodeJsonDeserializer() {
     @Override
     public RexNode deserialize(JsonParser jsonParser, DeserializationContext 
ctx)
             throws IOException {
-        JsonNode jsonNode = jsonParser.readValueAsTree();
-        return deserialize(jsonNode, jsonParser.getCodec(), ctx);
+        final JsonNode jsonNode = jsonParser.readValueAsTree();
+        final SerdeContext serdeContext = SerdeContext.get(ctx);
+        return deserialize(jsonNode, serdeContext);
     }
 
-    private RexNode deserialize(JsonNode jsonNode, ObjectCodec codec, 
DeserializationContext ctx)
+    private static RexNode deserialize(JsonNode jsonNode, SerdeContext 
serdeContext)
             throws IOException {
-        String kind = jsonNode.get(FIELD_NAME_KIND).asText().toUpperCase();
+        final String kind = jsonNode.required(FIELD_NAME_KIND).asText();
         switch (kind) {
-            case SQL_KIND_INPUT_REF:
-                return deserializeInputRef(jsonNode, codec, ctx);
-            case SQL_KIND_LITERAL:
-                return deserializeLiteral(jsonNode, codec, ctx);
-            case SQL_KIND_FIELD_ACCESS:
-                return deserializeFieldAccess(jsonNode, codec, ctx);
-            case SQL_KIND_CORREL_VARIABLE:
-                return deserializeCorrelVariable(jsonNode, codec, ctx);
-            case SQL_KIND_REX_CALL:
-                return deserializeCall(jsonNode, codec, ctx);
-            case SQL_KIND_PATTERN_INPUT_REF:
-                return deserializePatternInputRef(jsonNode, codec, ctx);
+            case KIND_INPUT_REF:
+                return deserializeInputRef(jsonNode, serdeContext);
+            case KIND_LITERAL:
+                return deserializeLiteral(jsonNode, serdeContext);
+            case KIND_FIELD_ACCESS:
+                return deserializeFieldAccess(jsonNode, serdeContext);
+            case KIND_CORREL_VARIABLE:
+                return deserializeCorrelVariable(jsonNode, serdeContext);
+            case KIND_PATTERN_INPUT_REF:
+                return deserializePatternFieldRef(jsonNode, serdeContext);
+            case KIND_CALL:
+                return deserializeCall(jsonNode, serdeContext);
             default:
                 throw new TableException("Cannot convert to RexNode: " + 
jsonNode.toPrettyString());
         }
     }
 
-    private RexNode deserializeInputRef(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType fieldType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
-        return SerdeContext.get(ctx).getRexBuilder().makeInputRef(fieldType, 
inputIndex);
+    private static RexNode deserializeInputRef(JsonNode jsonNode, SerdeContext 
serdeContext) {
+        final int inputIndex = 
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+        final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+        final RelDataType fieldType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
+        return serdeContext.getRexBuilder().makeInputRef(fieldType, 
inputIndex);
     }
 
-    private RexNode deserializePatternInputRef(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
-        String alpha = jsonNode.get(FIELD_NAME_ALPHA).asText();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType fieldType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
-        return SerdeContext.get(ctx)
-                .getRexBuilder()
-                .makePatternFieldRef(alpha, fieldType, inputIndex);
-    }
-
-    private RexNode deserializeLiteral(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType literalType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
+    private static RexNode deserializeLiteral(JsonNode jsonNode, SerdeContext 
serdeContext) {
+        final JsonNode logicalTypeNode = jsonNode.get(FIELD_NAME_TYPE);
+        final RelDataType relDataType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
         if (jsonNode.has(FIELD_NAME_SARG)) {
-            Sarg<?> sarg =
-                    toSarg(jsonNode.get(FIELD_NAME_SARG), 
literalType.getSqlTypeName(), codec, ctx);
-            return rexBuilder.makeSearchArgumentLiteral(sarg, literalType);
+            return deserializeSarg(jsonNode.required(FIELD_NAME_SARG), 
relDataType, serdeContext);
         } else if (jsonNode.has(FIELD_NAME_VALUE)) {
-            JsonNode literalNode = jsonNode.get(FIELD_NAME_VALUE);
-            if (literalNode.isNull()) {
-                return rexBuilder.makeNullLiteral(literalType);
+            final Object value =
+                    deserializeLiteralValue(jsonNode, 
relDataType.getSqlTypeName(), serdeContext);
+            if (value == null) {
+                return 
serdeContext.getRexBuilder().makeNullLiteral(relDataType);
             }
-            Object literal = toLiteralValue(jsonNode, 
literalType.getSqlTypeName(), codec, ctx);
-            return rexBuilder.makeLiteral(literal, literalType, true);
+            return serdeContext.getRexBuilder().makeLiteral(value, 
relDataType, true);
         } else {
             throw new TableException("Unknown literal: " + 
jsonNode.toPrettyString());
         }
     }
 
-    private Object toLiteralValue(
-            JsonNode literalNode,
-            SqlTypeName sqlTypeName,
-            ObjectCodec codec,
-            DeserializationContext ctx)
-            throws IOException {
-        JsonNode valueNode = literalNode.get(FIELD_NAME_VALUE);
+    @SuppressWarnings({"UnstableApiUsage", "rawtypes", "unchecked"})
+    private static RexNode deserializeSarg(
+            JsonNode sargNode, RelDataType relDataType, SerdeContext 
serdeContext) {
+        final RexBuilder rexBuilder = serdeContext.getRexBuilder();
+        final ArrayNode rangesNode = (ArrayNode) 
sargNode.required(FIELD_NAME_RANGES);
+        final Builder builder = builder();
+        for (JsonNode rangeNode : rangesNode) {
+            Range range = all();
+            if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
+                final JsonNode lowerNode = 
rangeNode.required(FIELD_NAME_BOUND_LOWER);
+                final Comparable<?> boundValue =
+                        (Comparable<?>)
+                                deserializeLiteralValue(
+                                        lowerNode, 
relDataType.getSqlTypeName(), serdeContext);
+                assert boundValue != null;
+                final BoundType boundType =
+                        serializableToCalcite(
+                                BoundType.class,
+                                
lowerNode.required(FIELD_NAME_BOUND_TYPE).asText());
+                final Range<?> r =
+                        boundType == BoundType.OPEN ? greaterThan(boundValue) 
: atLeast(boundValue);
+                range = range.intersection(r);
+            }
+            if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
+                final JsonNode upperNode = 
rangeNode.required(FIELD_NAME_BOUND_UPPER);
+                final Comparable<?> boundValue =
+                        (Comparable<?>)
+                                deserializeLiteralValue(
+                                        upperNode, 
relDataType.getSqlTypeName(), serdeContext);
+                assert boundValue != null;
+                final BoundType boundType =
+                        serializableToCalcite(
+                                BoundType.class,
+                                
upperNode.required(FIELD_NAME_BOUND_TYPE).asText());
+                final Range<?> r =
+                        boundType == BoundType.OPEN ? lessThan(boundValue) : 
atMost(boundValue);
+                range = range.intersection(r);
+            }
+            if (range.hasUpperBound() || range.hasLowerBound()) {
+                builder.add(range);
+            }
+        }
+        final boolean containsNull = 
sargNode.required(FIELD_NAME_CONTAINS_NULL).booleanValue();
+        return rexBuilder.makeSearchArgumentLiteral(
+                Sarg.of(containsNull, builder.build()), relDataType);
+    }
+
+    private static Object deserializeLiteralValue(

Review comment:
       Annotate return type with `@Nullable` to make it clear that this also 
handles null literals

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
##########
@@ -172,9 +172,10 @@ private boolean isLookupEnabled(CatalogPlanRestore 
planRestoreOption) {
     static ValidationException missingIdentifier() {
         return new ValidationException(
                 String.format(
-                        "The table cannot be deserialized, as no identifier is 
present within the JSON, "
-                                + "but lookup is forced by '%s' == '%s'. "
-                                + "Either allow restoring table from the 
catalog with '%s' == '%s' | '%s' or make sure you don't use anonymous tables 
when generating the plan.",
+                        "The table cannot be deserialized as no identifier is 
present in the persisted plan."

Review comment:
       All the messages here should be tested 
https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java#L336

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
##########
@@ -335,34 +359,185 @@ private void serialize(RexCall call, JsonGenerator gen, 
SerializerProvider seria
         gen.writeEndObject();
     }
 
-    private void serialize(SqlOperator operator, JsonGenerator gen) throws 
IOException {
-        gen.writeFieldName(FIELD_NAME_OPERATOR);
-        gen.writeStartObject();
-        gen.writeStringField(FIELD_NAME_NAME, operator.getName());
-        gen.writeStringField(FIELD_NAME_KIND, operator.kind.name());
-        gen.writeStringField(FIELD_NAME_SYNTAX, operator.getSyntax().name());
-        // TODO if a udf is registered with class name, class name is recorded 
enough
-        if (operator instanceof ScalarSqlFunction) {
-            ScalarSqlFunction scalarSqlFunc = (ScalarSqlFunction) operator;
-            gen.writeStringField(FIELD_NAME_DISPLAY_NAME, 
scalarSqlFunc.displayName());
-            gen.writeStringField(FIELD_NAME_FUNCTION_KIND, 
FunctionKind.SCALAR.name());
+    private static void serializeSqlOperator(
+            SqlOperator operator,
+            JsonGenerator gen,
+            SerializerProvider serializerProvider,
+            boolean serializeCatalogObjects)
+            throws IOException {
+        if (operator.getSyntax() != SqlSyntax.FUNCTION) {
             gen.writeStringField(
-                    FIELD_NAME_INSTANCE,
-                    
EncodingUtils.encodeObjectToString(scalarSqlFunc.scalarFunction()));
-        } else if (operator instanceof BridgingSqlFunction) {
-            BridgingSqlFunction bridgingSqlFunc = (BridgingSqlFunction) 
operator;
-            FunctionDefinition functionDefinition = 
bridgingSqlFunc.getDefinition();
-            if (functionDefinition instanceof BuiltInFunctionDefinition) {
-                // just record the flag, we can find it by name
-                gen.writeBooleanField(FIELD_NAME_BUILT_IN, true);
-            } else {
-                gen.writeStringField(FIELD_NAME_FUNCTION_KIND, 
functionDefinition.getKind().name());
-                gen.writeStringField(
-                        FIELD_NAME_INSTANCE,
-                        
EncodingUtils.encodeObjectToString(functionDefinition));
-                gen.writeBooleanField(FIELD_NAME_BRIDGING, true);
-            }
+                    FIELD_NAME_SYNTAX, 
calciteToSerializable(operator.getSyntax()).getValue());
         }
-        gen.writeEndObject();
+        if (operator instanceof BridgingSqlFunction) {
+            final BridgingSqlFunction function = (BridgingSqlFunction) 
operator;
+            serializeBridgingSqlFunction(
+                    function.getName(),
+                    function.getResolvedFunction(),
+                    gen,
+                    serializerProvider,
+                    serializeCatalogObjects);
+        } else if (operator instanceof BridgingSqlAggFunction) {
+            final BridgingSqlAggFunction function = (BridgingSqlAggFunction) 
operator;
+            serializeBridgingSqlFunction(
+                    function.getName(),
+                    function.getResolvedFunction(),
+                    gen,
+                    serializerProvider,
+                    serializeCatalogObjects);
+        } else if (operator instanceof ScalarSqlFunction
+                || operator instanceof TableSqlFunction
+                || operator instanceof AggSqlFunction) {
+            throw legacyException(operator.toString());
+        } else {
+            // We assume that all regular SqlOperators are internal. Only the 
function definitions
+            // stack is exposed to the user and can thus be external.
+            gen.writeStringField(
+                    FIELD_NAME_INTERNAL_NAME, 
BuiltInSqlOperator.toQualifiedName(operator));
+        }
+    }
+
+    private static void serializeBridgingSqlFunction(
+            String summaryName,
+            ContextResolvedFunction resolvedFunction,
+            JsonGenerator gen,
+            SerializerProvider serializerProvider,
+            boolean serializeCatalogObjects)
+            throws IOException {
+        final FunctionDefinition definition = resolvedFunction.getDefinition();
+        if (definition instanceof ScalarFunctionDefinition
+                || definition instanceof TableFunctionDefinition
+                || definition instanceof TableAggregateFunctionDefinition
+                || definition instanceof AggregateFunctionDefinition) {
+            throw legacyException(summaryName);
+        }
+
+        if (definition instanceof BuiltInFunctionDefinition) {
+            final BuiltInFunctionDefinition builtInFunction =
+                    (BuiltInFunctionDefinition) definition;
+            gen.writeStringField(FIELD_NAME_INTERNAL_NAME, 
builtInFunction.getQualifiedName());
+        } else if (resolvedFunction.isAnonymous()) {
+            serializeInlineFunction(summaryName, definition, gen);
+        } else if (resolvedFunction.isTemporary()) {
+            serializeTemporaryFunction(resolvedFunction, gen, 
serializerProvider);
+        } else {
+            assert resolvedFunction.isPermanent();
+            serializePermanentFunction(
+                    resolvedFunction, gen, serializerProvider, 
serializeCatalogObjects);
+        }
+    }
+
+    private static void serializeInlineFunction(
+            String summaryName, FunctionDefinition definition, JsonGenerator 
gen)
+            throws IOException {
+        if (!(definition instanceof UserDefinedFunction)
+                || !isClassNameSerializable((UserDefinedFunction) definition)) 
{
+            throw cannotSerializeInlineFunction(summaryName);
+        }
+        gen.writeStringField(FIELD_NAME_CLASS, 
definition.getClass().getName());
+    }
+
+    private static void serializeTemporaryFunction(
+            ContextResolvedFunction resolvedFunction,
+            JsonGenerator gen,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        final FunctionIdentifier identifier =
+                resolvedFunction
+                        .getIdentifier()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Non-anonymous temporary 
function should own a function identifier."));

Review comment:
       Because you reached this point, this is supposed to have a temporary? 
replace this with an assert or just Option.get?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -220,185 +260,298 @@ private Object toLiteralValue(
                 return ByteString.ofBase64(valueNode.asText());
             case CHAR:
             case VARCHAR:
-                return SerdeContext.get(ctx)
-                        .getRexBuilder()
-                        .makeLiteral(valueNode.asText())
-                        .getValue();
+                return 
serdeContext.getRexBuilder().makeLiteral(valueNode.asText()).getValue();
             case SYMBOL:
-                JsonNode classNode = literalNode.get(FIELD_NAME_CLASS);
-                return serializableToCalcite(
-                        SerializableSymbol.of(classNode.asText(), 
valueNode.asText()));
-            case ROW:
-            case MULTISET:
-                ArrayNode valuesNode = (ArrayNode) valueNode;
-                List<RexNode> list = new ArrayList<>();
-                for (int i = 0; i < valuesNode.size(); ++i) {
-                    list.add(deserialize(valuesNode.get(i), codec, ctx));
-                }
-                return list;
+                final JsonNode symbolNode = 
literalNode.required(FIELD_NAME_SYMBOL);
+                final SerializableSymbol symbol =
+                        SerializableSymbol.of(symbolNode.asText(), 
valueNode.asText());
+                return serializableToCalcite(symbol);
             default:
                 throw new TableException("Unknown literal: " + valueNode);
         }
     }
 
-    @SuppressWarnings({"rawtypes", "unchecked", "UnstableApiUsage"})
-    private Sarg<?> toSarg(
-            JsonNode jsonNode,
-            SqlTypeName sqlTypeName,
-            ObjectCodec codec,
-            DeserializationContext ctx)
+    private static RexNode deserializeFieldAccess(JsonNode jsonNode, 
SerdeContext serdeContext)
             throws IOException {
-        ArrayNode rangesNode = (ArrayNode) jsonNode.get(FIELD_NAME_RANGES);
-        com.google.common.collect.ImmutableRangeSet.Builder builder =
-                com.google.common.collect.ImmutableRangeSet.builder();
-        for (JsonNode rangeNode : rangesNode) {
-            com.google.common.collect.Range<?> range = 
com.google.common.collect.Range.all();
-            if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
-                JsonNode lowerNode = rangeNode.get(FIELD_NAME_BOUND_LOWER);
-                Comparable<?> boundValue =
-                        checkNotNull(
-                                (Comparable<?>) toLiteralValue(lowerNode, 
sqlTypeName, codec, ctx));
-                com.google.common.collect.BoundType boundType =
-                        com.google.common.collect.BoundType.valueOf(
-                                
lowerNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
-                com.google.common.collect.Range r =
-                        boundType == com.google.common.collect.BoundType.OPEN
-                                ? 
com.google.common.collect.Range.greaterThan(boundValue)
-                                : 
com.google.common.collect.Range.atLeast(boundValue);
-                range = range.intersection(r);
-            }
-            if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
-                JsonNode upperNode = rangeNode.get(FIELD_NAME_BOUND_UPPER);
-                Comparable<?> boundValue =
-                        checkNotNull(
-                                (Comparable<?>) toLiteralValue(upperNode, 
sqlTypeName, codec, ctx));
-                com.google.common.collect.BoundType boundType =
-                        com.google.common.collect.BoundType.valueOf(
-                                
upperNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
-                com.google.common.collect.Range r =
-                        boundType == com.google.common.collect.BoundType.OPEN
-                                ? 
com.google.common.collect.Range.lessThan(boundValue)
-                                : 
com.google.common.collect.Range.atMost(boundValue);
-                range = range.intersection(r);
-            }
-            if (range.hasUpperBound() || range.hasLowerBound()) {
-                builder.add(range);
-            }
-        }
-        boolean containsNull = 
jsonNode.get(FIELD_NAME_CONTAINS_NULL).booleanValue();
-        return Sarg.of(containsNull, builder.build());
+        final String fieldName = jsonNode.required(FIELD_NAME_NAME).asText();
+        final JsonNode exprNode = jsonNode.required(FIELD_NAME_EXPR);
+        final RexNode refExpr = deserialize(exprNode, serdeContext);

Review comment:
       Just as a brain exercise, when **serializing**, is it ever possible that 
we have some circular rex node graph?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -106,78 +118,108 @@ public RexNodeJsonDeserializer() {
     @Override
     public RexNode deserialize(JsonParser jsonParser, DeserializationContext 
ctx)
             throws IOException {
-        JsonNode jsonNode = jsonParser.readValueAsTree();
-        return deserialize(jsonNode, jsonParser.getCodec(), ctx);
+        final JsonNode jsonNode = jsonParser.readValueAsTree();
+        final SerdeContext serdeContext = SerdeContext.get(ctx);
+        return deserialize(jsonNode, serdeContext);
     }
 
-    private RexNode deserialize(JsonNode jsonNode, ObjectCodec codec, 
DeserializationContext ctx)
+    private static RexNode deserialize(JsonNode jsonNode, SerdeContext 
serdeContext)
             throws IOException {
-        String kind = jsonNode.get(FIELD_NAME_KIND).asText().toUpperCase();
+        final String kind = jsonNode.required(FIELD_NAME_KIND).asText();
         switch (kind) {
-            case SQL_KIND_INPUT_REF:
-                return deserializeInputRef(jsonNode, codec, ctx);
-            case SQL_KIND_LITERAL:
-                return deserializeLiteral(jsonNode, codec, ctx);
-            case SQL_KIND_FIELD_ACCESS:
-                return deserializeFieldAccess(jsonNode, codec, ctx);
-            case SQL_KIND_CORREL_VARIABLE:
-                return deserializeCorrelVariable(jsonNode, codec, ctx);
-            case SQL_KIND_REX_CALL:
-                return deserializeCall(jsonNode, codec, ctx);
-            case SQL_KIND_PATTERN_INPUT_REF:
-                return deserializePatternInputRef(jsonNode, codec, ctx);
+            case KIND_INPUT_REF:
+                return deserializeInputRef(jsonNode, serdeContext);
+            case KIND_LITERAL:
+                return deserializeLiteral(jsonNode, serdeContext);
+            case KIND_FIELD_ACCESS:
+                return deserializeFieldAccess(jsonNode, serdeContext);
+            case KIND_CORREL_VARIABLE:
+                return deserializeCorrelVariable(jsonNode, serdeContext);
+            case KIND_PATTERN_INPUT_REF:
+                return deserializePatternFieldRef(jsonNode, serdeContext);
+            case KIND_CALL:
+                return deserializeCall(jsonNode, serdeContext);
             default:
                 throw new TableException("Cannot convert to RexNode: " + 
jsonNode.toPrettyString());
         }
     }
 
-    private RexNode deserializeInputRef(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType fieldType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
-        return SerdeContext.get(ctx).getRexBuilder().makeInputRef(fieldType, 
inputIndex);
+    private static RexNode deserializeInputRef(JsonNode jsonNode, SerdeContext 
serdeContext) {
+        final int inputIndex = 
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+        final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+        final RelDataType fieldType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
+        return serdeContext.getRexBuilder().makeInputRef(fieldType, 
inputIndex);
     }
 
-    private RexNode deserializePatternInputRef(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
-        String alpha = jsonNode.get(FIELD_NAME_ALPHA).asText();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType fieldType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
-        return SerdeContext.get(ctx)
-                .getRexBuilder()
-                .makePatternFieldRef(alpha, fieldType, inputIndex);
-    }
-
-    private RexNode deserializeLiteral(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType literalType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
+    private static RexNode deserializeLiteral(JsonNode jsonNode, SerdeContext 
serdeContext) {
+        final JsonNode logicalTypeNode = jsonNode.get(FIELD_NAME_TYPE);

Review comment:
       Isn't this required?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -106,78 +118,108 @@ public RexNodeJsonDeserializer() {
     @Override
     public RexNode deserialize(JsonParser jsonParser, DeserializationContext 
ctx)
             throws IOException {
-        JsonNode jsonNode = jsonParser.readValueAsTree();
-        return deserialize(jsonNode, jsonParser.getCodec(), ctx);
+        final JsonNode jsonNode = jsonParser.readValueAsTree();
+        final SerdeContext serdeContext = SerdeContext.get(ctx);
+        return deserialize(jsonNode, serdeContext);
     }
 
-    private RexNode deserialize(JsonNode jsonNode, ObjectCodec codec, 
DeserializationContext ctx)
+    private static RexNode deserialize(JsonNode jsonNode, SerdeContext 
serdeContext)
             throws IOException {
-        String kind = jsonNode.get(FIELD_NAME_KIND).asText().toUpperCase();
+        final String kind = jsonNode.required(FIELD_NAME_KIND).asText();
         switch (kind) {
-            case SQL_KIND_INPUT_REF:
-                return deserializeInputRef(jsonNode, codec, ctx);
-            case SQL_KIND_LITERAL:
-                return deserializeLiteral(jsonNode, codec, ctx);
-            case SQL_KIND_FIELD_ACCESS:
-                return deserializeFieldAccess(jsonNode, codec, ctx);
-            case SQL_KIND_CORREL_VARIABLE:
-                return deserializeCorrelVariable(jsonNode, codec, ctx);
-            case SQL_KIND_REX_CALL:
-                return deserializeCall(jsonNode, codec, ctx);
-            case SQL_KIND_PATTERN_INPUT_REF:
-                return deserializePatternInputRef(jsonNode, codec, ctx);
+            case KIND_INPUT_REF:
+                return deserializeInputRef(jsonNode, serdeContext);
+            case KIND_LITERAL:
+                return deserializeLiteral(jsonNode, serdeContext);
+            case KIND_FIELD_ACCESS:
+                return deserializeFieldAccess(jsonNode, serdeContext);
+            case KIND_CORREL_VARIABLE:
+                return deserializeCorrelVariable(jsonNode, serdeContext);
+            case KIND_PATTERN_INPUT_REF:
+                return deserializePatternFieldRef(jsonNode, serdeContext);
+            case KIND_CALL:
+                return deserializeCall(jsonNode, serdeContext);
             default:
                 throw new TableException("Cannot convert to RexNode: " + 
jsonNode.toPrettyString());
         }
     }
 
-    private RexNode deserializeInputRef(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType fieldType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
-        return SerdeContext.get(ctx).getRexBuilder().makeInputRef(fieldType, 
inputIndex);
+    private static RexNode deserializeInputRef(JsonNode jsonNode, SerdeContext 
serdeContext) {
+        final int inputIndex = 
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+        final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+        final RelDataType fieldType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
+        return serdeContext.getRexBuilder().makeInputRef(fieldType, 
inputIndex);
     }
 
-    private RexNode deserializePatternInputRef(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
-        String alpha = jsonNode.get(FIELD_NAME_ALPHA).asText();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType fieldType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
-        return SerdeContext.get(ctx)
-                .getRexBuilder()
-                .makePatternFieldRef(alpha, fieldType, inputIndex);
-    }
-
-    private RexNode deserializeLiteral(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType literalType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
+    private static RexNode deserializeLiteral(JsonNode jsonNode, SerdeContext 
serdeContext) {
+        final JsonNode logicalTypeNode = jsonNode.get(FIELD_NAME_TYPE);
+        final RelDataType relDataType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
         if (jsonNode.has(FIELD_NAME_SARG)) {
-            Sarg<?> sarg =
-                    toSarg(jsonNode.get(FIELD_NAME_SARG), 
literalType.getSqlTypeName(), codec, ctx);
-            return rexBuilder.makeSearchArgumentLiteral(sarg, literalType);
+            return deserializeSarg(jsonNode.required(FIELD_NAME_SARG), 
relDataType, serdeContext);
         } else if (jsonNode.has(FIELD_NAME_VALUE)) {
-            JsonNode literalNode = jsonNode.get(FIELD_NAME_VALUE);
-            if (literalNode.isNull()) {
-                return rexBuilder.makeNullLiteral(literalType);
+            final Object value =
+                    deserializeLiteralValue(jsonNode, 
relDataType.getSqlTypeName(), serdeContext);
+            if (value == null) {
+                return 
serdeContext.getRexBuilder().makeNullLiteral(relDataType);
             }
-            Object literal = toLiteralValue(jsonNode, 
literalType.getSqlTypeName(), codec, ctx);
-            return rexBuilder.makeLiteral(literal, literalType, true);
+            return serdeContext.getRexBuilder().makeLiteral(value, 
relDataType, true);
         } else {
             throw new TableException("Unknown literal: " + 
jsonNode.toPrettyString());
         }
     }
 
-    private Object toLiteralValue(
-            JsonNode literalNode,
-            SqlTypeName sqlTypeName,
-            ObjectCodec codec,
-            DeserializationContext ctx)
-            throws IOException {
-        JsonNode valueNode = literalNode.get(FIELD_NAME_VALUE);
+    @SuppressWarnings({"UnstableApiUsage", "rawtypes", "unchecked"})
+    private static RexNode deserializeSarg(
+            JsonNode sargNode, RelDataType relDataType, SerdeContext 
serdeContext) {
+        final RexBuilder rexBuilder = serdeContext.getRexBuilder();
+        final ArrayNode rangesNode = (ArrayNode) 
sargNode.required(FIELD_NAME_RANGES);
+        final Builder builder = builder();
+        for (JsonNode rangeNode : rangesNode) {
+            Range range = all();
+            if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
+                final JsonNode lowerNode = 
rangeNode.required(FIELD_NAME_BOUND_LOWER);
+                final Comparable<?> boundValue =
+                        (Comparable<?>)
+                                deserializeLiteralValue(
+                                        lowerNode, 
relDataType.getSqlTypeName(), serdeContext);
+                assert boundValue != null;
+                final BoundType boundType =
+                        serializableToCalcite(
+                                BoundType.class,
+                                
lowerNode.required(FIELD_NAME_BOUND_TYPE).asText());
+                final Range<?> r =
+                        boundType == BoundType.OPEN ? greaterThan(boundValue) 
: atLeast(boundValue);
+                range = range.intersection(r);
+            }
+            if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
+                final JsonNode upperNode = 
rangeNode.required(FIELD_NAME_BOUND_UPPER);
+                final Comparable<?> boundValue =
+                        (Comparable<?>)
+                                deserializeLiteralValue(
+                                        upperNode, 
relDataType.getSqlTypeName(), serdeContext);
+                assert boundValue != null;
+                final BoundType boundType =
+                        serializableToCalcite(
+                                BoundType.class,
+                                
upperNode.required(FIELD_NAME_BOUND_TYPE).asText());
+                final Range<?> r =
+                        boundType == BoundType.OPEN ? lessThan(boundValue) : 
atMost(boundValue);
+                range = range.intersection(r);
+            }
+            if (range.hasUpperBound() || range.hasLowerBound()) {
+                builder.add(range);
+            }
+        }
+        final boolean containsNull = 
sargNode.required(FIELD_NAME_CONTAINS_NULL).booleanValue();
+        return rexBuilder.makeSearchArgumentLiteral(
+                Sarg.of(containsNull, builder.build()), relDataType);
+    }
+
+    private static Object deserializeLiteralValue(
+            JsonNode literalNode, SqlTypeName sqlTypeName, SerdeContext 
serdeContext) {

Review comment:
       Please take the `literalValue` `JsonNode` here, rather than extracting 
it within this function, to make it consistent with all other our deser 
functions and make clear what this function is doing.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
##########
@@ -18,58 +18,485 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.module.Module;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
-import com.google.common.collect.BoundType;
-import com.google.common.collect.ImmutableRangeSet;
-import com.google.common.collect.Range;
-import com.google.common.collect.TreeRangeSet;
-import org.apache.calcite.avatica.util.ByteString;
-import org.apache.calcite.avatica.util.TimeUnit;
-import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.StructKind;
 import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.DateString;
-import org.apache.calcite.util.Sarg;
-import org.apache.calcite.util.TimeString;
-import org.apache.calcite.util.TimestampString;
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.configuredSerdeContext;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.assertThatJsonContains;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.assertThatJsonDoesNotContain;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.testJsonRoundTrip;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toJson;
-import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toObject;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectReader;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectWriter;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.getObjectMapper;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_CLASS;
+import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
+import static 
org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for serialization/deserialization of {@link RexNode}. */
 public class RexNodeSerdeTest {
 
     private static final FlinkTypeFactory FACTORY = 
FlinkTypeFactory.INSTANCE();
+    private static final String FUNCTION_NAME = "MyFunc";
+    private static final FunctionIdentifier FUNCTION_SYS_ID = 
FunctionIdentifier.of(FUNCTION_NAME);
+    private static final FunctionIdentifier FUNCTION_CAT_ID =
+            FunctionIdentifier.of(
+                    ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, 
FUNCTION_NAME));
+    private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID =
+            UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList());
+    private static final SerializableScalarFunction SER_UDF_IMPL = new 
SerializableScalarFunction();
+    private static final Class<SerializableScalarFunction> SER_UDF_CLASS =
+            SerializableScalarFunction.class;
+    private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER =
+            new OtherSerializableScalarFunction();
+    private static final Class<OtherSerializableScalarFunction> 
SER_UDF_CLASS_OTHER =
+            OtherSerializableScalarFunction.class;
+    private static final NonSerializableScalarFunction NON_SER_UDF_IMPL =
+            new NonSerializableScalarFunction(true);
+    private static final NonSerializableFunctionDefinition 
NON_SER_FUNCTION_DEF_IMPL =
+            new NonSerializableFunctionDefinition();
+    private static final ContextResolvedFunction PERMANENT_FUNCTION =
+            ContextResolvedFunction.permanent(FUNCTION_CAT_ID, SER_UDF_IMPL);
 
     @ParameterizedTest
     @MethodSource("testRexNodeSerde")
     public void testRexNodeSerde(RexNode rexNode) throws IOException {
-        final SerdeContext serdeContext = configuredSerdeContext();
-        final String json = toJson(serdeContext, rexNode);
-        final RexNode actual = toObject(serdeContext, json, RexNode.class);
-
+        final RexNode actual = testJsonRoundTrip(rexNode, RexNode.class);
         assertThat(actual).isEqualTo(rexNode);
     }
 
+    @Test
+    public void testInlineFunction() throws IOException {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        // Serializable function
+        testJsonRoundTrip(
+                createFunctionCall(serdeContext, 
ContextResolvedFunction.anonymous(SER_UDF_IMPL)),
+                RexNode.class);
+
+        // Non-serializable function due to fields
+        assertThatThrownBy(
+                        () ->
+                                toJson(
+                                        serdeContext,
+                                        createFunctionCall(
+                                                serdeContext,
+                                                
ContextResolvedFunction.anonymous(
+                                                        NON_SER_UDF_IMPL))))
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "The function's implementation class must not 
be stateful"));
+    }
+
+    @Test
+    public void testSystemFunction() throws Throwable {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        final ThrowingCallable callable =
+                () ->
+                        testJsonRoundTrip(
+                                serdeContext,
+                                createFunctionCall(
+                                        serdeContext,
+                                        ContextResolvedFunction.permanent(
+                                                FUNCTION_SYS_ID, 
NON_SER_UDF_IMPL)),
+                                RexNode.class);
+
+        // Missing function
+        assertThatThrownBy(callable)
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "Could not lookup system function '" + 
FUNCTION_NAME + "'."));
+
+        // Module provided permanent function
+        serdeContext
+                .getFlinkContext()
+                .getModuleManager()
+                .loadModule("myModule", FunctionProvidingModule.INSTANCE);
+        callable.call();
+    }
+
+    @Test
+    public void testTemporarySystemFunction() throws Throwable {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        final ThrowingCallable callable =
+                () ->
+                        testJsonRoundTrip(
+                                serdeContext,
+                                createFunctionCall(
+                                        serdeContext,
+                                        ContextResolvedFunction.temporary(
+                                                FUNCTION_SYS_ID, 
NON_SER_UDF_IMPL)),
+                                RexNode.class);
+
+        // Missing function
+        assertThatThrownBy(callable)
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "Could not lookup system function '" + 
FUNCTION_NAME + "'."));
+
+        // Registered temporary function
+        registerTemporaryFunction(serdeContext);
+        callable.call();
+    }
+
+    @Test
+    public void testTemporaryCatalogFunction() throws Throwable {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        final ThrowingCallable callable =
+                () ->
+                        testJsonRoundTrip(
+                                serdeContext,
+                                createFunctionCall(
+                                        serdeContext,
+                                        ContextResolvedFunction.temporary(
+                                                FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL)),
+                                RexNode.class);
+
+        // Missing function
+        assertThatThrownBy(callable)
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "The persisted plan does not include all 
required "
+                                        + "catalog metadata for function '"
+                                        + FUNCTION_CAT_ID.asSummaryString()
+                                        + "'."));
+
+        // Registered temporary function
+        registerTemporaryFunction(serdeContext);
+        callable.call();
+    }
+
+    @Nested
+    @DisplayName("Test CatalogPlanCompilation == IDENTIFIER")
+    class TestCompileIdentifier {
+
+        private final CatalogPlanCompilation compilation = 
CatalogPlanCompilation.IDENTIFIER;
+
+        @Nested
+        @DisplayName("and CatalogPlanRestore == IDENTIFIER")
+        class TestRestoreIdentifier {
+
+            private final CatalogPlanRestore restore = 
CatalogPlanRestore.IDENTIFIER;

Review comment:
       maybe avoid recreating the ctx everytime but just create one here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to