matriv commented on a change in pull request #18858:
URL: https://github.com/apache/flink/pull/18858#discussion_r811301489



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
##########
@@ -335,34 +359,185 @@ private void serialize(RexCall call, JsonGenerator gen, 
SerializerProvider seria
         gen.writeEndObject();
     }
 
-    private void serialize(SqlOperator operator, JsonGenerator gen) throws 
IOException {
-        gen.writeFieldName(FIELD_NAME_OPERATOR);
-        gen.writeStartObject();
-        gen.writeStringField(FIELD_NAME_NAME, operator.getName());
-        gen.writeStringField(FIELD_NAME_KIND, operator.kind.name());
-        gen.writeStringField(FIELD_NAME_SYNTAX, operator.getSyntax().name());
-        // TODO if a udf is registered with class name, class name is recorded 
enough
-        if (operator instanceof ScalarSqlFunction) {
-            ScalarSqlFunction scalarSqlFunc = (ScalarSqlFunction) operator;
-            gen.writeStringField(FIELD_NAME_DISPLAY_NAME, 
scalarSqlFunc.displayName());
-            gen.writeStringField(FIELD_NAME_FUNCTION_KIND, 
FunctionKind.SCALAR.name());
+    private static void serializeSqlOperator(
+            SqlOperator operator,
+            JsonGenerator gen,
+            SerializerProvider serializerProvider,
+            boolean serializeCatalogObjects)
+            throws IOException {
+        if (operator.getSyntax() != SqlSyntax.FUNCTION) {
             gen.writeStringField(
-                    FIELD_NAME_INSTANCE,
-                    
EncodingUtils.encodeObjectToString(scalarSqlFunc.scalarFunction()));
-        } else if (operator instanceof BridgingSqlFunction) {
-            BridgingSqlFunction bridgingSqlFunc = (BridgingSqlFunction) 
operator;
-            FunctionDefinition functionDefinition = 
bridgingSqlFunc.getDefinition();
-            if (functionDefinition instanceof BuiltInFunctionDefinition) {
-                // just record the flag, we can find it by name
-                gen.writeBooleanField(FIELD_NAME_BUILT_IN, true);
-            } else {
-                gen.writeStringField(FIELD_NAME_FUNCTION_KIND, 
functionDefinition.getKind().name());
-                gen.writeStringField(
-                        FIELD_NAME_INSTANCE,
-                        
EncodingUtils.encodeObjectToString(functionDefinition));
-                gen.writeBooleanField(FIELD_NAME_BRIDGING, true);
-            }
+                    FIELD_NAME_SYNTAX, 
calciteToSerializable(operator.getSyntax()).getValue());
         }
-        gen.writeEndObject();
+        if (operator instanceof BridgingSqlFunction) {
+            final BridgingSqlFunction function = (BridgingSqlFunction) 
operator;
+            serializeBridgingSqlFunction(
+                    function.getName(),
+                    function.getResolvedFunction(),
+                    gen,
+                    serializerProvider,
+                    serializeCatalogObjects);
+        } else if (operator instanceof BridgingSqlAggFunction) {
+            final BridgingSqlAggFunction function = (BridgingSqlAggFunction) 
operator;
+            serializeBridgingSqlFunction(
+                    function.getName(),
+                    function.getResolvedFunction(),
+                    gen,
+                    serializerProvider,
+                    serializeCatalogObjects);
+        } else if (operator instanceof ScalarSqlFunction
+                || operator instanceof TableSqlFunction
+                || operator instanceof AggSqlFunction) {
+            throw legacyException(operator.toString());
+        } else {
+            // We assume that all regular SqlOperators are internal. Only the 
function definitions
+            // stack is exposed to the user and can thus be external.
+            gen.writeStringField(
+                    FIELD_NAME_INTERNAL_NAME, 
BuiltInSqlOperator.toQualifiedName(operator));
+        }
+    }
+
+    private static void serializeBridgingSqlFunction(
+            String summaryName,
+            ContextResolvedFunction resolvedFunction,
+            JsonGenerator gen,
+            SerializerProvider serializerProvider,
+            boolean serializeCatalogObjects)
+            throws IOException {
+        final FunctionDefinition definition = resolvedFunction.getDefinition();
+        if (definition instanceof ScalarFunctionDefinition
+                || definition instanceof TableFunctionDefinition
+                || definition instanceof TableAggregateFunctionDefinition
+                || definition instanceof AggregateFunctionDefinition) {
+            throw legacyException(summaryName);
+        }
+
+        if (definition instanceof BuiltInFunctionDefinition) {
+            final BuiltInFunctionDefinition builtInFunction =
+                    (BuiltInFunctionDefinition) definition;
+            gen.writeStringField(FIELD_NAME_INTERNAL_NAME, 
builtInFunction.getQualifiedName());
+        } else if (resolvedFunction.isAnonymous()) {
+            serializeInlineFunction(summaryName, definition, gen);
+        } else if (resolvedFunction.isTemporary()) {
+            serializeTemporaryFunction(resolvedFunction, gen, 
serializerProvider);
+        } else {
+            assert resolvedFunction.isPermanent();
+            serializePermanentFunction(
+                    resolvedFunction, gen, serializerProvider, 
serializeCatalogObjects);
+        }
+    }
+
+    private static void serializeInlineFunction(
+            String summaryName, FunctionDefinition definition, JsonGenerator 
gen)
+            throws IOException {
+        if (!(definition instanceof UserDefinedFunction)
+                || !isClassNameSerializable((UserDefinedFunction) definition)) 
{
+            throw cannotSerializeInlineFunction(summaryName);
+        }
+        gen.writeStringField(FIELD_NAME_CLASS, 
definition.getClass().getName());
+    }
+
+    private static void serializeTemporaryFunction(
+            ContextResolvedFunction resolvedFunction,
+            JsonGenerator gen,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        final FunctionIdentifier identifier =
+                resolvedFunction
+                        .getIdentifier()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Non-anonymous temporary 
function should own a function identifier."));
+        if (identifier.getSimpleName().isPresent()) {
+            gen.writeStringField(FIELD_NAME_SYSTEM_NAME, 
identifier.getSimpleName().get());
+        } else {
+            assert identifier.getIdentifier().isPresent();
+            serializerProvider.defaultSerializeField(
+                    FIELD_NAME_CATALOG_NAME, identifier.getIdentifier().get(), 
gen);
+        }
+    }
+
+    private static void serializePermanentFunction(
+            ContextResolvedFunction resolvedFunction,
+            JsonGenerator gen,
+            SerializerProvider serializerProvider,
+            boolean serializeCatalogObjects)
+            throws IOException {
+        final FunctionIdentifier identifier =
+                resolvedFunction
+                        .getIdentifier()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Permanent functions should 
own a function identifier."));
+
+        if (identifier.getSimpleName().isPresent()) {
+            // Module provided system function
+            gen.writeStringField(FIELD_NAME_SYSTEM_NAME, 
identifier.getSimpleName().get());
+        } else {
+            assert identifier.getIdentifier().isPresent();
+            serializeCatalogFunction(
+                    identifier.getIdentifier().get(),
+                    resolvedFunction.getDefinition(),
+                    gen,
+                    serializerProvider,
+                    serializeCatalogObjects);
+        }
+    }
+
+    private static void serializeCatalogFunction(
+            ObjectIdentifier objectIdentifier,
+            FunctionDefinition definition,
+            JsonGenerator gen,
+            SerializerProvider serializerProvider,
+            boolean serializeCatalogObjects)
+            throws IOException {
+        serializerProvider.defaultSerializeField(FIELD_NAME_CATALOG_NAME, 
objectIdentifier, gen);
+
+        if (!serializeCatalogObjects) {
+            return;
+        }
+
+        if (!(definition instanceof UserDefinedFunction)
+                || !isClassNameSerializable((UserDefinedFunction) definition)) 
{
+            throw cannotSerializePermanentCatalogFunction(objectIdentifier);
+        }
+
+        gen.writeStringField(FIELD_NAME_CLASS, 
definition.getClass().getName());
+    }
+
+    private static TableException legacyException(String summaryName) {

Review comment:
       Do we have tests for those (also below)?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
##########
@@ -172,9 +172,10 @@ private boolean isLookupEnabled(CatalogPlanRestore 
planRestoreOption) {
     static ValidationException missingIdentifier() {
         return new ValidationException(
                 String.format(
-                        "The table cannot be deserialized, as no identifier is 
present within the JSON, "
-                                + "but lookup is forced by '%s' == '%s'. "
-                                + "Either allow restoring table from the 
catalog with '%s' == '%s' | '%s' or make sure you don't use anonymous tables 
when generating the plan.",
+                        "The table cannot be deserialized as no identifier is 
present in the persisted plan."

Review comment:
       Unless I'm missing something, those cases (of exceptions) are not 
covered?
   Shouldn't we have tests for all those error messages?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
##########
@@ -237,33 +247,38 @@ private void serialize(
             case INTERVAL_MINUTE:
             case INTERVAL_MINUTE_SECOND:
             case INTERVAL_SECOND:
-                gen.writeNumberField(FIELD_NAME_VALUE, ((BigDecimal) 
value).longValue());
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                gen.writeStringField(FIELD_NAME_VALUE, value.toString());
+                break;
+            case BINARY:
+            case VARBINARY:
+                gen.writeStringField(FIELD_NAME_VALUE, ((ByteString) 
value).toBase64String());
+                break;
+            case CHAR:
+            case VARCHAR:
+                gen.writeStringField(FIELD_NAME_VALUE, ((NlsString) 
value).getValue());
                 break;
             case SYMBOL:
                 final SerializableSymbol symbol = 
calciteToSerializable((Enum<?>) value);
+                gen.writeStringField(FIELD_NAME_SYMBOL, symbol.getKind());
                 gen.writeStringField(FIELD_NAME_VALUE, symbol.getValue());
-                gen.writeStringField(FIELD_NAME_CLASS, symbol.getKind());
                 break;
             case SARG:
-                serialize((Sarg<?>) value, elementTypeName, gen, 
serializerProvider);
-                break;
-            case ROW:
-            case MULTISET:
-                gen.writeFieldName(FIELD_NAME_VALUE);
-                gen.writeStartArray();
-                for (RexLiteral v : (FlatLists.ComparableList<RexLiteral>) 
value) {
-                    serialize(v, gen, serializerProvider);
-                }
-                gen.writeEndArray();
+                serializeSargValue((Sarg<?>) value, elementTypeName, gen, 
serializerProvider);
                 break;
             default:
-                // TODO support ARRAY, MAP
-                throw new TableException("Unknown value: " + value + ", type: 
" + literalTypeName);
+                throw new TableException(

Review comment:
       You replaced the TODO for `array` &  `map` and removed support for `ROW` 
and `MULTISET`.
   Just double checking, don't have those as literals, right?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
##########
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import com.google.common.collect.BoundType;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.TreeRangeSet;
+import org.apache.calcite.avatica.util.ByteString;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.DateString;
+import org.apache.calcite.util.Sarg;
+import org.apache.calcite.util.TimeString;
+import org.apache.calcite.util.TimestampString;
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.assertThatJsonContains;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.assertThatJsonDoesNotContain;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.testJsonRoundTrip;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toJson;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectReader;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectWriter;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.getObjectMapper;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_CLASS;
+import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
+import static 
org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for serialization/deserialization of {@link RexNode}. */
+public class RexNodeJsonSerdeTest {
+
+    private static final FlinkTypeFactory FACTORY = 
FlinkTypeFactory.INSTANCE();
+    private static final String FUNCTION_NAME = "MyFunc";
+    private static final FunctionIdentifier FUNCTION_SYS_ID = 
FunctionIdentifier.of(FUNCTION_NAME);
+    private static final FunctionIdentifier FUNCTION_CAT_ID =
+            FunctionIdentifier.of(
+                    ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, 
FUNCTION_NAME));
+    private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID =
+            UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList());
+    private static final SerializableScalarFunction SER_UDF_IMPL = new 
SerializableScalarFunction();
+    private static final Class<SerializableScalarFunction> SER_UDF_CLASS =
+            SerializableScalarFunction.class;
+    private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER =
+            new OtherSerializableScalarFunction();
+    private static final Class<OtherSerializableScalarFunction> 
SER_UDF_CLASS_OTHER =
+            OtherSerializableScalarFunction.class;
+    private static final NonSerializableScalarFunction NON_SER_UDF_IMPL =
+            new NonSerializableScalarFunction(true);
+    private static final NonSerializableFunctionDefinition 
NON_SER_FUNCTION_DEF_IMPL =
+            new NonSerializableFunctionDefinition();
+    private static final ContextResolvedFunction PERMANENT_FUNCTION =
+            ContextResolvedFunction.permanent(FUNCTION_CAT_ID, SER_UDF_IMPL);
+
+    @ParameterizedTest
+    @MethodSource("testRexNodeSerde")
+    public void testRexNodeSerde(RexNode rexNode) throws IOException {
+        final RexNode actual = testJsonRoundTrip(rexNode, RexNode.class);
+        assertThat(actual).isEqualTo(rexNode);
+    }
+
+    @Test
+    public void testInlineFunction() throws IOException {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        // Serializable function
+        testJsonRoundTrip(
+                createFunctionCall(serdeContext, 
ContextResolvedFunction.anonymous(SER_UDF_IMPL)),
+                RexNode.class);
+
+        // Non-serializable function due to fields
+        assertThatThrownBy(
+                        () ->
+                                toJson(
+                                        serdeContext,
+                                        createFunctionCall(
+                                                serdeContext,
+                                                
ContextResolvedFunction.anonymous(
+                                                        NON_SER_UDF_IMPL))))
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "The function's implementation class must not 
be stateful"));
+    }
+
+    @Test
+    public void testSystemFunction() throws Throwable {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        final ThrowingCallable callable =
+                () ->
+                        testJsonRoundTrip(
+                                serdeContext,
+                                createFunctionCall(
+                                        serdeContext,
+                                        ContextResolvedFunction.permanent(
+                                                FUNCTION_SYS_ID, 
NON_SER_UDF_IMPL)),
+                                RexNode.class);
+
+        // Missing function
+        assertThatThrownBy(callable)
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "Could not lookup system function '" + 
FUNCTION_NAME + "'."));
+
+        // Module provided permanent function
+        serdeContext
+                .getFlinkContext()
+                .getModuleManager()
+                .loadModule("myModule", FunctionProvidingModule.INSTANCE);
+        callable.call();
+    }
+
+    @Test
+    public void testTemporarySystemFunction() throws Throwable {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        final ThrowingCallable callable =
+                () ->
+                        testJsonRoundTrip(
+                                serdeContext,
+                                createFunctionCall(
+                                        serdeContext,
+                                        ContextResolvedFunction.temporary(
+                                                FUNCTION_SYS_ID, 
NON_SER_UDF_IMPL)),
+                                RexNode.class);
+
+        // Missing function
+        assertThatThrownBy(callable)
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "Could not lookup system function '" + 
FUNCTION_NAME + "'."));
+
+        // Registered temporary function
+        registerTemporaryFunction(serdeContext);
+        callable.call();
+    }
+
+    @Test
+    public void testTemporaryCatalogFunction() throws Throwable {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        final ThrowingCallable callable =
+                () ->
+                        testJsonRoundTrip(
+                                serdeContext,
+                                createFunctionCall(
+                                        serdeContext,
+                                        ContextResolvedFunction.temporary(
+                                                FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL)),
+                                RexNode.class);
+
+        // Missing function
+        assertThatThrownBy(callable)
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "The persisted plan does not include all 
required "
+                                        + "catalog metadata for function '"
+                                        + FUNCTION_CAT_ID.asSummaryString()
+                                        + "'."));
+
+        // Registered temporary function
+        registerTemporaryFunction(serdeContext);
+        callable.call();
+    }
+
+    @Nested
+    @DisplayName("Test CatalogPlanCompilation == IDENTIFIER")
+    class TestCompileIdentifier {
+
+        private final CatalogPlanCompilation compilation = 
CatalogPlanCompilation.IDENTIFIER;
+
+        @Nested
+        @DisplayName("and CatalogPlanRestore == IDENTIFIER")
+        class TestRestoreIdentifier {
+
+            private final CatalogPlanRestore restore = 
CatalogPlanRestore.IDENTIFIER;
+
+            @Test
+            void withConstantCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+                assertThatJsonDoesNotContain(json, FIELD_NAME_CLASS);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                assertThat(actual).isEqualTo(PERMANENT_FUNCTION);
+            }
+
+            @Test
+            void withDroppedCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+
+                dropPermanentFunction(ctx);
+
+                assertThatThrownBy(() -> deserialize(ctx, json))
+                        .satisfies(
+                                anyCauseMatches(
+                                        TableException.class,
+                                        "Make sure a registered catalog 
contains the function"));
+            }
+
+            @Test
+            void withModifiedCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+
+                modifyPermanentFunction(ctx);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                assertThat(actual)
+                        .isEqualTo(
+                                ContextResolvedFunction.permanent(
+                                        FUNCTION_CAT_ID, SER_UDF_IMPL_OTHER));
+            }
+        }
+
+        @Nested
+        @DisplayName("and CatalogPlanRestore == ALL_ENFORCED")
+        class TestRestoreAllEnforced {
+
+            private final CatalogPlanRestore restore = 
CatalogPlanRestore.ALL_ENFORCED;
+
+            @Test
+            void withConstantCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+                assertThatJsonDoesNotContain(json, FIELD_NAME_CLASS);
+
+                assertThatThrownBy(() -> deserialize(ctx, json))
+                        .satisfies(
+                                anyCauseMatches(
+                                        TableException.class,
+                                        "plan does not include all required 
catalog metadata"));
+            }
+
+            @Test
+            void withShadowingTemporaryFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+
+                // The temporary function can be used as a replacement for the 
disabled catalog
+                // lookup
+                registerTemporaryFunction(ctx);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                assertThat(actual)
+                        .isEqualTo(
+                                ContextResolvedFunction.temporary(
+                                        FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL));
+            }
+        }
+
+        @Nested
+        @DisplayName("and CatalogPlanRestore == ALL")
+        class TestRestoreAll {
+
+            private final CatalogPlanRestore restore = CatalogPlanRestore.ALL;
+
+            @Test
+            void withConstantCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+                assertThatJsonDoesNotContain(json, FIELD_NAME_CLASS);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                assertThat(actual).isEqualTo(PERMANENT_FUNCTION);
+            }
+
+            @Test
+            void withShadowingTemporaryFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+
+                // The temporary function can be used as a replacement for the 
disabled catalog
+                // lookup
+                registerTemporaryFunction(ctx);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                assertThat(actual)
+                        .isEqualTo(
+                                ContextResolvedFunction.temporary(
+                                        FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL));
+            }
+
+            @Test
+            void withModifiedCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+
+                // Can replace the original function
+                modifyPermanentFunction(ctx);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                assertThat(actual)
+                        .isEqualTo(
+                                ContextResolvedFunction.permanent(
+                                        FUNCTION_CAT_ID, SER_UDF_IMPL_OTHER));
+            }
+        }
+    }
+
+    @Nested
+    @DisplayName("Test CatalogPlanCompilation == ALL")
+    class TestCompileAll {
+
+        private final CatalogPlanCompilation compilation = 
CatalogPlanCompilation.ALL;
+
+        @Nested
+        @DisplayName("and CatalogPlanRestore == IDENTIFIER")
+        class TestRestoreIdentifier {
+
+            private final CatalogPlanRestore restore = 
CatalogPlanRestore.IDENTIFIER;
+
+            @Test
+            void withConstantCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+                assertThatJsonContains(json, FIELD_NAME_CLASS);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                assertThat(actual).isEqualTo(PERMANENT_FUNCTION);
+            }
+
+            @Test
+            void withDroppedCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+
+                dropPermanentFunction(ctx);
+
+                assertThatThrownBy(() -> deserialize(ctx, json))
+                        .satisfies(
+                                anyCauseMatches(
+                                        TableException.class,
+                                        "Make sure a registered catalog 
contains the function"));
+            }
+
+            @Test
+            void withShadowingTemporaryFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+
+                // The temporary function can be used as a replacement for the 
disabled catalog
+                // lookup
+                registerTemporaryFunction(ctx);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                assertThat(actual)
+                        .isEqualTo(
+                                ContextResolvedFunction.temporary(
+                                        FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL));
+            }
+        }
+
+        @Nested
+        @DisplayName("and CatalogPlanRestore == ALL_ENFORCED")
+        class TestRestoreAllEnforced {
+
+            private final CatalogPlanRestore restore = 
CatalogPlanRestore.ALL_ENFORCED;
+
+            @Test
+            void withConstantCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                // Dropping has no effect
+                dropPermanentFunction(ctx);
+
+                // Also shadowing the original one has no effect
+                registerTemporaryFunction(ctx);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+                assertThatJsonContains(json, FIELD_NAME_CLASS);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                assertThat(actual)
+                        .isEqualTo(
+                                
ContextResolvedFunction.permanent(FUNCTION_CAT_ID, SER_UDF_IMPL));
+            }
+        }
+
+        @Nested
+        @DisplayName("and CatalogPlanRestore == ALL")
+        class TestRestoreAll {
+
+            private final CatalogPlanRestore restore = CatalogPlanRestore.ALL;
+
+            @Test
+            void withConstantCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+                assertThatJsonContains(json, FIELD_NAME_CLASS);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                // The serde is symmetric, because the function is still 
present in the catalog
+                assertThat(actual).isEqualTo(PERMANENT_FUNCTION);
+            }
+
+            @Test
+            void withDroppedCatalogFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+                assertThatJsonContains(json, FIELD_NAME_CLASS);
+
+                dropPermanentFunction(ctx);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                assertThat(actual)
+                        .isEqualTo(
+                                
ContextResolvedFunction.permanent(FUNCTION_CAT_ID, SER_UDF_IMPL));
+            }
+
+            @Test
+            void withShadowingTemporaryFunction() throws Exception {
+                final SerdeContext ctx = 
serdeContextWithPermanentFunction(compilation, restore);
+
+                final JsonNode json = serialize(ctx, PERMANENT_FUNCTION);
+                assertThatJsonContains(json, FIELD_NAME_CLASS);
+
+                registerTemporaryFunction(ctx);
+
+                final ContextResolvedFunction actual = deserialize(ctx, json);
+                // The serde is not symmetric, the function is temporary after 
restore
+                assertThat(actual)
+                        .isEqualTo(
+                                ContextResolvedFunction.temporary(
+                                        FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL));
+            }
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Test data
+    // 
--------------------------------------------------------------------------------------------
+
+    @SuppressWarnings("UnstableApiUsage")
+    private static Stream<RexNode> testRexNodeSerde() {
+        final RexBuilder rexBuilder = new RexBuilder(FACTORY);
+        final RelDataType inputType =
+                FACTORY.createStructType(
+                        StructKind.PEEK_FIELDS_NO_EXPAND,
+                        Arrays.asList(
+                                FACTORY.createSqlType(SqlTypeName.INTEGER),
+                                FACTORY.createSqlType(SqlTypeName.BIGINT),
+                                FACTORY.createStructType(
+                                        StructKind.PEEK_FIELDS_NO_EXPAND,
+                                        Arrays.asList(
+                                                
FACTORY.createSqlType(SqlTypeName.VARCHAR),
+                                                
FACTORY.createSqlType(SqlTypeName.VARCHAR)),
+                                        Arrays.asList("n1", "n2"))),
+                        Arrays.asList("f1", "f2", "f3"));
+
+        return Stream.of(
+                
rexBuilder.makeNullLiteral(FACTORY.createSqlType(SqlTypeName.VARCHAR)),
+                rexBuilder.makeLiteral(true),
+                rexBuilder.makeExactLiteral(
+                        new BigDecimal(Byte.MAX_VALUE), 
FACTORY.createSqlType(SqlTypeName.TINYINT)),
+                rexBuilder.makeExactLiteral(
+                        new BigDecimal(Short.MAX_VALUE),
+                        FACTORY.createSqlType(SqlTypeName.SMALLINT)),
+                rexBuilder.makeExactLiteral(
+                        new BigDecimal(Integer.MAX_VALUE),
+                        FACTORY.createSqlType(SqlTypeName.INTEGER)),
+                rexBuilder.makeExactLiteral(
+                        new BigDecimal(Long.MAX_VALUE), 
FACTORY.createSqlType(SqlTypeName.BIGINT)),
+                rexBuilder.makeExactLiteral(
+                        BigDecimal.valueOf(Double.MAX_VALUE),
+                        FACTORY.createSqlType(SqlTypeName.DOUBLE)),
+                rexBuilder.makeApproxLiteral(
+                        BigDecimal.valueOf(Float.MAX_VALUE),
+                        FACTORY.createSqlType(SqlTypeName.FLOAT)),
+                rexBuilder.makeExactLiteral(new 
BigDecimal("23.1234567890123456789012345678")),
+                rexBuilder.makeIntervalLiteral(
+                        BigDecimal.valueOf(100),
+                        new SqlIntervalQualifier(
+                                TimeUnit.YEAR,
+                                4,
+                                TimeUnit.YEAR,
+                                RelDataType.PRECISION_NOT_SPECIFIED,
+                                SqlParserPos.ZERO)),
+                rexBuilder.makeIntervalLiteral(
+                        BigDecimal.valueOf(3),
+                        new SqlIntervalQualifier(
+                                TimeUnit.YEAR,
+                                2,
+                                TimeUnit.MONTH,
+                                RelDataType.PRECISION_NOT_SPECIFIED,
+                                SqlParserPos.ZERO)),
+                rexBuilder.makeIntervalLiteral(
+                        BigDecimal.valueOf(3),
+                        new SqlIntervalQualifier(
+                                TimeUnit.DAY, 2, TimeUnit.SECOND, 6, 
SqlParserPos.ZERO)),
+                rexBuilder.makeIntervalLiteral(
+                        BigDecimal.valueOf(3),
+                        new SqlIntervalQualifier(
+                                TimeUnit.SECOND, 2, TimeUnit.SECOND, 6, 
SqlParserPos.ZERO)),
+                rexBuilder.makeDateLiteral(DateString.fromDaysSinceEpoch(10)),
+                rexBuilder.makeDateLiteral(new DateString("2000-12-12")),
+                rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(1234), 
3),
+                rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(123456), 
6),
+                rexBuilder.makeTimeLiteral(new 
TimeString("01:01:01.000000001"), 9),
+                
rexBuilder.makeTimestampLiteral(TimestampString.fromMillisSinceEpoch(1234), 3),
+                
rexBuilder.makeTimestampLiteral(TimestampString.fromMillisSinceEpoch(123456789),
 9),
+                rexBuilder.makeTimestampLiteral(
+                        new TimestampString("0001-01-01 01:01:01.000000001"), 
9),
+                rexBuilder.makeTimestampLiteral(new 
TimestampString("2000-12-12 12:30:57.1234"), 4),
+                rexBuilder.makeBinaryLiteral(ByteString.EMPTY),
+                
rexBuilder.makeBinaryLiteral(ByteString.ofBase64("SGVsbG8gV29ybGQh")),
+                rexBuilder.makeLiteral(""),
+                rexBuilder.makeLiteral("abc"),
+                rexBuilder.makeFlag(SqlTrimFunction.Flag.BOTH),
+                rexBuilder.makeFlag(TimeUnitRange.DAY),
+                rexBuilder.makeSearchArgumentLiteral(
+                        Sarg.of(
+                                false,
+                                ImmutableRangeSet.of(
+                                        Range.closed(
+                                                BigDecimal.valueOf(1), 
BigDecimal.valueOf(10)))),
+                        FACTORY.createSqlType(SqlTypeName.INTEGER)),
+                rexBuilder.makeSearchArgumentLiteral(
+                        Sarg.of(
+                                false,
+                                ImmutableRangeSet.of(
+                                        Range.range(
+                                                BigDecimal.valueOf(1),
+                                                BoundType.OPEN,
+                                                BigDecimal.valueOf(10),
+                                                BoundType.CLOSED))),
+                        FACTORY.createSqlType(SqlTypeName.INTEGER)),
+                rexBuilder.makeSearchArgumentLiteral(
+                        Sarg.of(
+                                false,
+                                TreeRangeSet.create(
+                                        Arrays.asList(
+                                                Range.closed(
+                                                        BigDecimal.valueOf(1),
+                                                        BigDecimal.valueOf(1)),
+                                                Range.closed(
+                                                        BigDecimal.valueOf(3),
+                                                        BigDecimal.valueOf(3)),
+                                                Range.closed(
+                                                        BigDecimal.valueOf(6),
+                                                        
BigDecimal.valueOf(6))))),
+                        FACTORY.createSqlType(SqlTypeName.INTEGER)),
+                
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.BIGINT), 0),
+                rexBuilder.makeCorrel(inputType, new CorrelationId("$cor1")),
+                rexBuilder.makeFieldAccess(
+                        rexBuilder.makeCorrel(inputType, new 
CorrelationId("$cor2")), "f2", true),
+                // cast($1 as smallint)
+                rexBuilder.makeCast(
+                        FACTORY.createSqlType(SqlTypeName.SMALLINT),
+                        
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1)),
+                // $1 in (1, 3, 5)
+                rexBuilder.makeIn(
+                        
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1),
+                        Arrays.asList(
+                                rexBuilder.makeExactLiteral(new BigDecimal(1)),
+                                rexBuilder.makeExactLiteral(new BigDecimal(3)),
+                                rexBuilder.makeExactLiteral(new 
BigDecimal(5)))),
+                // null or $1 is null
+                rexBuilder.makeCall(
+                        SqlStdOperatorTable.OR,
+                        
rexBuilder.makeNullLiteral(FACTORY.createSqlType(SqlTypeName.INTEGER)),
+                        rexBuilder.makeCall(
+                                SqlStdOperatorTable.IS_NOT_NULL,
+                                rexBuilder.makeInputRef(
+                                        
FACTORY.createSqlType(SqlTypeName.INTEGER), 1))),
+                // $1 >= 10
+                rexBuilder.makeCall(
+                        SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+                        
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1),
+                        rexBuilder.makeExactLiteral(new BigDecimal(10))),
+                // hash_code($1)
+                rexBuilder.makeCall(
+                        FlinkSqlOperatorTable.HASH_CODE,
+                        
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1)),
+                rexBuilder.makePatternFieldRef(
+                        "test", FACTORY.createSqlType(SqlTypeName.INTEGER), 
0));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helper methods / classes
+    // 
--------------------------------------------------------------------------------------------
+
+    private static RexNode createFunctionCall(
+            SerdeContext serdeContext, ContextResolvedFunction 
resolvedFunction) {
+        final BridgingSqlFunction nonSerializableFunction =
+                BridgingSqlFunction.of(
+                        serdeContext.getFlinkContext(),
+                        serdeContext.getTypeFactory(),
+                        resolvedFunction);
+        return serdeContext
+                .getRexBuilder()
+                .makeCall(
+                        nonSerializableFunction,
+                        serdeContext
+                                .getRexBuilder()
+                                .makeLiteral(
+                                        12,
+                                        serdeContext
+                                                .getTypeFactory()
+                                                
.createSqlType(SqlTypeName.INTEGER),
+                                        false));
+    }
+
+    private static SerdeContext serdeContext(
+            CatalogPlanCompilation planCompilationOption, CatalogPlanRestore 
planRestoreOption) {
+        final Configuration configuration = new Configuration();
+        configuration.set(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, 
planRestoreOption);
+        configuration.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, 
planCompilationOption);
+
+        return JsonSerdeTestUtil.configuredSerdeContext(configuration);
+    }
+
+    private static SerdeContext serdeContextWithPermanentFunction(
+            CatalogPlanCompilation planCompilationOption, CatalogPlanRestore 
planRestoreOption) {
+        final SerdeContext serdeContext = serdeContext(planCompilationOption, 
planRestoreOption);
+        serdeContext
+                .getFlinkContext()
+                .getFunctionCatalog()
+                .registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, 
SER_UDF_CLASS, false);
+        return serdeContext;
+    }
+
+    private static void dropPermanentFunction(SerdeContext serdeContext) {
+        serdeContext
+                .getFlinkContext()
+                .getFunctionCatalog()
+                .dropCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, false);
+    }
+
+    private static void modifyPermanentFunction(SerdeContext serdeContext) {
+        dropPermanentFunction(serdeContext);
+        serdeContext
+                .getFlinkContext()
+                .getFunctionCatalog()
+                .registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, 
SER_UDF_CLASS_OTHER, false);
+    }
+
+    private static void registerTemporaryFunction(SerdeContext serdeContext) {
+        serdeContext
+                .getFlinkContext()
+                .getFunctionCatalog()
+                .registerTemporaryCatalogFunction(
+                        UNRESOLVED_FUNCTION_CAT_ID, NON_SER_FUNCTION_DEF_IMPL, 
false);
+    }
+
+    private JsonNode serialize(

Review comment:
       Checking the calls, we are only testing permanent functions, or am I 
missing something?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -220,185 +260,298 @@ private Object toLiteralValue(
                 return ByteString.ofBase64(valueNode.asText());
             case CHAR:
             case VARCHAR:
-                return SerdeContext.get(ctx)
-                        .getRexBuilder()
-                        .makeLiteral(valueNode.asText())
-                        .getValue();
+                return 
serdeContext.getRexBuilder().makeLiteral(valueNode.asText()).getValue();
             case SYMBOL:
-                JsonNode classNode = literalNode.get(FIELD_NAME_CLASS);
-                return serializableToCalcite(
-                        SerializableSymbol.of(classNode.asText(), 
valueNode.asText()));
-            case ROW:
-            case MULTISET:
-                ArrayNode valuesNode = (ArrayNode) valueNode;
-                List<RexNode> list = new ArrayList<>();
-                for (int i = 0; i < valuesNode.size(); ++i) {
-                    list.add(deserialize(valuesNode.get(i), codec, ctx));
-                }
-                return list;
+                final JsonNode symbolNode = 
literalNode.required(FIELD_NAME_SYMBOL);
+                final SerializableSymbol symbol =
+                        SerializableSymbol.of(symbolNode.asText(), 
valueNode.asText());
+                return serializableToCalcite(symbol);
             default:
                 throw new TableException("Unknown literal: " + valueNode);
         }
     }
 
-    @SuppressWarnings({"rawtypes", "unchecked", "UnstableApiUsage"})
-    private Sarg<?> toSarg(
-            JsonNode jsonNode,
-            SqlTypeName sqlTypeName,
-            ObjectCodec codec,
-            DeserializationContext ctx)
+    private static RexNode deserializeFieldAccess(JsonNode jsonNode, 
SerdeContext serdeContext)
             throws IOException {
-        ArrayNode rangesNode = (ArrayNode) jsonNode.get(FIELD_NAME_RANGES);
-        com.google.common.collect.ImmutableRangeSet.Builder builder =
-                com.google.common.collect.ImmutableRangeSet.builder();
-        for (JsonNode rangeNode : rangesNode) {
-            com.google.common.collect.Range<?> range = 
com.google.common.collect.Range.all();
-            if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
-                JsonNode lowerNode = rangeNode.get(FIELD_NAME_BOUND_LOWER);
-                Comparable<?> boundValue =
-                        checkNotNull(
-                                (Comparable<?>) toLiteralValue(lowerNode, 
sqlTypeName, codec, ctx));
-                com.google.common.collect.BoundType boundType =
-                        com.google.common.collect.BoundType.valueOf(
-                                
lowerNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
-                com.google.common.collect.Range r =
-                        boundType == com.google.common.collect.BoundType.OPEN
-                                ? 
com.google.common.collect.Range.greaterThan(boundValue)
-                                : 
com.google.common.collect.Range.atLeast(boundValue);
-                range = range.intersection(r);
-            }
-            if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
-                JsonNode upperNode = rangeNode.get(FIELD_NAME_BOUND_UPPER);
-                Comparable<?> boundValue =
-                        checkNotNull(
-                                (Comparable<?>) toLiteralValue(upperNode, 
sqlTypeName, codec, ctx));
-                com.google.common.collect.BoundType boundType =
-                        com.google.common.collect.BoundType.valueOf(
-                                
upperNode.get(FIELD_NAME_BOUND_TYPE).asText().toUpperCase());
-                com.google.common.collect.Range r =
-                        boundType == com.google.common.collect.BoundType.OPEN
-                                ? 
com.google.common.collect.Range.lessThan(boundValue)
-                                : 
com.google.common.collect.Range.atMost(boundValue);
-                range = range.intersection(r);
-            }
-            if (range.hasUpperBound() || range.hasLowerBound()) {
-                builder.add(range);
-            }
-        }
-        boolean containsNull = 
jsonNode.get(FIELD_NAME_CONTAINS_NULL).booleanValue();
-        return Sarg.of(containsNull, builder.build());
+        final String fieldName = jsonNode.required(FIELD_NAME_NAME).asText();
+        final JsonNode exprNode = jsonNode.required(FIELD_NAME_EXPR);
+        final RexNode refExpr = deserialize(exprNode, serdeContext);
+        return serdeContext.getRexBuilder().makeFieldAccess(refExpr, 
fieldName, true);
     }
 
-    private RexNode deserializeFieldAccess(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        String fieldName = jsonNode.get(FIELD_NAME_NAME).asText();
-        JsonNode exprNode = jsonNode.get(FIELD_NAME_EXPR);
-        RexNode refExpr = deserialize(exprNode, codec, ctx);
-        return SerdeContext.get(ctx).getRexBuilder().makeFieldAccess(refExpr, 
fieldName, true);
+    private static RexNode deserializeCorrelVariable(JsonNode jsonNode, 
SerdeContext serdeContext) {
+        final String correl = jsonNode.required(FIELD_NAME_CORREL).asText();
+        final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+        final RelDataType fieldType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
+        return serdeContext.getRexBuilder().makeCorrel(fieldType, new 
CorrelationId(correl));
     }
 
-    private RexNode deserializeCorrelVariable(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        String correl = jsonNode.get(FIELD_NAME_CORREL).asText();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType fieldType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
-        return SerdeContext.get(ctx)
-                .getRexBuilder()
-                .makeCorrel(fieldType, new CorrelationId(correl));
+    private static RexNode deserializePatternFieldRef(
+            JsonNode jsonNode, SerdeContext serdeContext) {
+        final int inputIndex = 
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+        final String alpha = jsonNode.required(FIELD_NAME_ALPHA).asText();
+        final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+        final RelDataType fieldType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
+        return serdeContext.getRexBuilder().makePatternFieldRef(alpha, 
fieldType, inputIndex);
     }
 
-    private RexNode deserializeCall(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
-        SqlOperator operator = toOperator(jsonNode.get(FIELD_NAME_OPERATOR), 
SerdeContext.get(ctx));
-        ArrayNode operandNodes = (ArrayNode) jsonNode.get(FIELD_NAME_OPERANDS);
-        List<RexNode> rexOperands = new ArrayList<>();
+    private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext 
serdeContext)
+            throws IOException {
+        final SqlOperator operator = deserializeSqlOperator(jsonNode, 
serdeContext);
+        final ArrayNode operandNodes = (ArrayNode) 
jsonNode.get(FIELD_NAME_OPERANDS);
+        final List<RexNode> rexOperands = new ArrayList<>();
         for (JsonNode node : operandNodes) {
-            rexOperands.add(deserialize(node, codec, ctx));
+            rexOperands.add(deserialize(node, serdeContext));
         }
         final RelDataType callType;
         if (jsonNode.has(FIELD_NAME_TYPE)) {
-            JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-            callType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
+            final JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
+            callType = RelDataTypeJsonDeserializer.deserialize(typeNode, 
serdeContext);
         } else {
-            callType = rexBuilder.deriveReturnType(operator, rexOperands);
+            callType = serdeContext.getRexBuilder().deriveReturnType(operator, 
rexOperands);
         }
-        return rexBuilder.makeCall(callType, operator, rexOperands);
+        return serdeContext.getRexBuilder().makeCall(callType, operator, 
rexOperands);
     }
 
-    private SqlOperator toOperator(JsonNode jsonNode, SerdeContext ctx) throws 
IOException {
-        String name = jsonNode.get(FIELD_NAME_NAME).asText();
-        SqlKind sqlKind = 
SqlKind.valueOf(jsonNode.get(FIELD_NAME_KIND).asText());
-        SqlSyntax sqlSyntax = 
SqlSyntax.valueOf(jsonNode.get(FIELD_NAME_SYNTAX).asText());
-        List<SqlOperator> operators = new ArrayList<>();
-        ctx.getOperatorTable()
-                .lookupOperatorOverloads(
-                        new SqlIdentifier(name, new SqlParserPos(0, 0)),
-                        null, // category
-                        sqlSyntax,
-                        operators,
-                        SqlNameMatchers.liberal());
-        for (SqlOperator operator : operators) {
-            // in case different operator has the same kind, check with both 
name and kind.
-            if (operator.kind == sqlKind) {
-                return operator;
-            }
+    private static SqlOperator deserializeSqlOperator(
+            JsonNode jsonNode, SerdeContext serdeContext) {
+        final SqlSyntax syntax;
+        if (jsonNode.has(FIELD_NAME_SYNTAX)) {
+            syntax =
+                    serializableToCalcite(
+                            SqlSyntax.class, 
jsonNode.required(FIELD_NAME_SYNTAX).asText());
+        } else {
+            syntax = SqlSyntax.FUNCTION;
         }
 
-        // try to find operator from std operator table.
-        SqlStdOperatorTable.instance()
-                .lookupOperatorOverloads(
-                        new SqlIdentifier(name, new SqlParserPos(0, 0)),
-                        null, // category
-                        sqlSyntax,
-                        operators,
-                        SqlNameMatchers.liberal());
-        for (SqlOperator operator : operators) {
-            // in case different operator has the same kind, check with both 
name and kind.
-            if (operator.kind == sqlKind) {
-                return operator;
-            }
+        if (jsonNode.has(FIELD_NAME_INTERNAL_NAME)) {
+            return deserializeInternalFunction(
+                    jsonNode.required(FIELD_NAME_INTERNAL_NAME).asText(), 
syntax, serdeContext);
+        } else if (jsonNode.has(FIELD_NAME_CATALOG_NAME)) {
+            return deserializeCatalogFunction(jsonNode, syntax, serdeContext);
+        } else if (jsonNode.has(FIELD_NAME_CLASS)) {
+            return deserializeFunctionClass(jsonNode, serdeContext);
+        } else if (jsonNode.has(FIELD_NAME_SYSTEM_NAME)) {
+            return deserializeSystemFunction(
+                    jsonNode.required(FIELD_NAME_SYSTEM_NAME).asText(), 
syntax, serdeContext);
+        } else {
+            throw new TableException("Invalid function call.");
         }
+    }
 
-        // built-in function
-        // TODO supports other module's built-in function
-        if (jsonNode.has(FIELD_NAME_BUILT_IN) && 
jsonNode.get(FIELD_NAME_BUILT_IN).booleanValue()) {
-            Optional<FunctionDefinition> function = 
CoreModule.INSTANCE.getFunctionDefinition(name);
-            Preconditions.checkArgument(function.isPresent());
-            return BridgingSqlFunction.of(
-                    ctx.getFlinkContext(),
-                    ctx.getTypeFactory(),
-                    
ContextResolvedFunction.permanent(FunctionIdentifier.of(name), function.get()));
+    private static SqlOperator deserializeSystemFunction(
+            String systemName, SqlSyntax syntax, SerdeContext serdeContext) {
+        // This method covers both temporary system functions and permanent 
system
+        // functions from a module
+        final Optional<SqlOperator> systemOperator =
+                lookupOptionalSqlOperator(
+                        FunctionIdentifier.of(systemName), syntax, 
serdeContext, true);
+        if (systemOperator.isPresent()) {
+            return systemOperator.get();
         }
+        throw new TableException(
+                String.format(
+                        "Could not lookup system function '%s'. "
+                                + "Make sure it has been registered before as 
temporary "
+                                + "functions are not contained in the 
persisted plan. "
+                                + "If the function was provided by a module, 
make sure to reloaded "
+                                + "all used modules in the correct order.",
+                        systemName));
+    }
 
-        if (jsonNode.has(FIELD_NAME_FUNCTION_KIND) && 
jsonNode.has(FIELD_NAME_INSTANCE)) {
-            FunctionKind functionKind =
-                    FunctionKind.valueOf(
-                            
jsonNode.get(FIELD_NAME_FUNCTION_KIND).asText().toUpperCase());
-            String instanceStr = jsonNode.get(FIELD_NAME_INSTANCE).asText();
-            if (functionKind != FunctionKind.SCALAR) {
-                throw new TableException("Unknown function kind: " + 
functionKind);
-            }
-            if (jsonNode.has(FIELD_NAME_BRIDGING)
-                    && jsonNode.get(FIELD_NAME_BRIDGING).booleanValue()) {
-                FunctionDefinition function =
-                        EncodingUtils.decodeStringToObject(instanceStr, 
ctx.getClassLoader());
+    private static SqlOperator deserializeInternalFunction(
+            String internalName, SqlSyntax syntax, SerdeContext serdeContext) {
+        // Try $FUNC$1
+        final Optional<SqlOperator> internalOperator =
+                lookupOptionalSqlOperator(
+                        FunctionIdentifier.of(internalName), syntax, 
serdeContext, false);
+        if (internalOperator.isPresent()) {
+            return internalOperator.get();
+        }
+        // Try FUNC
+        final String publicName = 
BuiltInSqlOperator.extractNameFromQualifiedName(internalName);
+        final Optional<SqlOperator> latestOperator =
+                lookupOptionalSqlOperator(
+                        FunctionIdentifier.of(publicName), syntax, 
serdeContext, true);
+        if (latestOperator.isPresent()) {
+            return latestOperator.get();
+        }
+        throw new TableException(
+                String.format(
+                        "Could not resolve internal system function '%s'. "
+                                + "This is a bug, please file an issue.",
+                        internalName));
+    }
+
+    private static SqlOperator deserializeFunctionClass(
+            JsonNode jsonNode, SerdeContext serdeContext) {
+        final String className = jsonNode.required(FIELD_NAME_CLASS).asText();
+        final Class<?> functionClass = loadClass(className, serdeContext, 
"function");
+        final UserDefinedFunction functionInstance =
+                UserDefinedFunctionHelper.instantiateFunction(functionClass);
+
+        final ContextResolvedFunction resolvedFunction;
+        // This can never be a system function
+        // because we never serialize classes for system functions
+        if (jsonNode.has(FIELD_NAME_CATALOG_NAME)) {
+            final ObjectIdentifier objectIdentifier =
+                    ObjectIdentifierJsonDeserializer.deserialize(
+                            
jsonNode.required(FIELD_NAME_CATALOG_NAME).asText(), serdeContext);
+            resolvedFunction =
+                    ContextResolvedFunction.permanent(
+                            FunctionIdentifier.of(objectIdentifier), 
functionInstance);
+        } else {
+            resolvedFunction = 
ContextResolvedFunction.anonymous(functionInstance);
+        }
+
+        switch (functionInstance.getKind()) {
+            case SCALAR:
+            case TABLE:
                 return BridgingSqlFunction.of(
-                        ctx.getFlinkContext(),
-                        ctx.getTypeFactory(),
-                        
ContextResolvedFunction.permanent(FunctionIdentifier.of(name), function));
-            } else {
-                String displayName = 
jsonNode.get(FIELD_NAME_DISPLAY_NAME).asText();
-                ScalarFunction function =
-                        EncodingUtils.decodeStringToObject(instanceStr, 
ctx.getClassLoader());
-                return new ScalarSqlFunction(
-                        FunctionIdentifier.of(name),
-                        displayName,
-                        function,
-                        ctx.getTypeFactory(),
-                        JavaScalaConversionUtil.toScala(Optional.empty()));
+                        serdeContext.getFlinkContext(),
+                        serdeContext.getTypeFactory(),
+                        resolvedFunction);
+            case AGGREGATE:
+                return BridgingSqlAggFunction.of(
+                        serdeContext.getFlinkContext(),
+                        serdeContext.getTypeFactory(),
+                        resolvedFunction);
+            default:
+                throw new TableException(
+                        String.format(
+                                "Unsupported anonymous function kind '%s' for 
class '%s'.",
+                                functionInstance.getKind(), className));
+        }
+    }
+
+    private static SqlOperator deserializeCatalogFunction(
+            JsonNode jsonNode, SqlSyntax syntax, SerdeContext serdeContext) {
+        final CatalogPlanRestore restoreStrategy =
+                
serdeContext.getConfiguration().get(PLAN_RESTORE_CATALOG_OBJECTS);
+        final FunctionIdentifier identifier =
+                FunctionIdentifier.of(
+                        ObjectIdentifierJsonDeserializer.deserialize(
+                                
jsonNode.required(FIELD_NAME_CATALOG_NAME).asText(), serdeContext));
+
+        switch (restoreStrategy) {
+            case ALL:
+                {
+                    final Optional<SqlOperator> lookupOperator =
+                            lookupOptionalSqlOperator(identifier, syntax, 
serdeContext, false);
+                    if (lookupOperator.isPresent()) {
+                        return lookupOperator.get();
+                    } else if (jsonNode.has(FIELD_NAME_CLASS)) {
+                        return deserializeFunctionClass(jsonNode, 
serdeContext);
+                    }
+                    throw missingFunctionFromCatalog(identifier, false);
+                }
+            case ALL_ENFORCED:
+                {
+                    if (jsonNode.has(FIELD_NAME_CLASS)) {
+                        return deserializeFunctionClass(jsonNode, 
serdeContext);
+                    }
+                    final Optional<SqlOperator> lookupOperator =
+                            lookupOptionalSqlOperator(identifier, syntax, 
serdeContext, false);
+                    if 
(lookupOperator.map(RexNodeJsonDeserializer::isTemporary).orElse(false)) {
+                        return lookupOperator.get();
+                    }
+                    throw lookupDisabled(identifier);
+                }
+            case IDENTIFIER:
+                final Optional<SqlOperator> lookupOperator =
+                        lookupOptionalSqlOperator(identifier, syntax, 
serdeContext, true);
+                if (lookupOperator.isPresent()) {
+                    return lookupOperator.get();
+                } else {
+                    throw missingFunctionFromCatalog(identifier, true);
+                }
+            default:
+                throw new TableException("Unsupported restore strategy: " + 
restoreStrategy);
+        }
+    }
+
+    private static boolean isTemporary(SqlOperator sqlOperator) {
+        if (sqlOperator instanceof BridgingSqlFunction) {
+            return ((BridgingSqlFunction) 
sqlOperator).getResolvedFunction().isTemporary();
+        } else if (sqlOperator instanceof BridgingSqlAggFunction) {
+            return ((BridgingSqlAggFunction) 
sqlOperator).getResolvedFunction().isTemporary();
+        }
+        return false;
+    }
+
+    private static Optional<SqlOperator> lookupOptionalSqlOperator(
+            FunctionIdentifier identifier,
+            SqlSyntax syntax,
+            SerdeContext serdeContext,
+            boolean throwOnError) {
+        final List<SqlOperator> foundOperators = new ArrayList<>();
+        try {
+            serdeContext
+                    .getOperatorTable()
+                    .lookupOperatorOverloads(
+                            new SqlIdentifier(identifier.toList(), new 
SqlParserPos(0, 0)),
+                            null, // category
+                            syntax,
+                            foundOperators,
+                            SqlNameMatchers.liberal());
+            if (foundOperators.size() != 1) {
+                return Optional.empty();
+            }
+            return Optional.of(foundOperators.get(0));
+        } catch (Throwable t) {
+            if (throwOnError) {
+                throw new TableException(
+                        String.format("Error during lookup of function '%s'.", 
identifier), t);
             }
+            return Optional.empty();
+        }
+    }
+
+    private static TableException lookupDisabled(FunctionIdentifier 
identifier) {

Review comment:
       Also tests for those?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to