twalthr commented on a change in pull request #18858:
URL: https://github.com/apache/flink/pull/18858#discussion_r812640189



##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
##########
@@ -18,58 +18,485 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.module.Module;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
-import com.google.common.collect.BoundType;
-import com.google.common.collect.ImmutableRangeSet;
-import com.google.common.collect.Range;
-import com.google.common.collect.TreeRangeSet;
-import org.apache.calcite.avatica.util.ByteString;
-import org.apache.calcite.avatica.util.TimeUnit;
-import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.StructKind;
 import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.DateString;
-import org.apache.calcite.util.Sarg;
-import org.apache.calcite.util.TimeString;
-import org.apache.calcite.util.TimestampString;
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.configuredSerdeContext;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.assertThatJsonContains;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.assertThatJsonDoesNotContain;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.testJsonRoundTrip;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toJson;
-import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toObject;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectReader;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectWriter;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.getObjectMapper;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_CLASS;
+import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
+import static 
org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for serialization/deserialization of {@link RexNode}. */
 public class RexNodeSerdeTest {
 
     private static final FlinkTypeFactory FACTORY = 
FlinkTypeFactory.INSTANCE();
+    private static final String FUNCTION_NAME = "MyFunc";
+    private static final FunctionIdentifier FUNCTION_SYS_ID = 
FunctionIdentifier.of(FUNCTION_NAME);
+    private static final FunctionIdentifier FUNCTION_CAT_ID =
+            FunctionIdentifier.of(
+                    ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, 
FUNCTION_NAME));
+    private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID =
+            UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList());
+    private static final SerializableScalarFunction SER_UDF_IMPL = new 
SerializableScalarFunction();
+    private static final Class<SerializableScalarFunction> SER_UDF_CLASS =
+            SerializableScalarFunction.class;
+    private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER =
+            new OtherSerializableScalarFunction();
+    private static final Class<OtherSerializableScalarFunction> 
SER_UDF_CLASS_OTHER =
+            OtherSerializableScalarFunction.class;
+    private static final NonSerializableScalarFunction NON_SER_UDF_IMPL =
+            new NonSerializableScalarFunction(true);
+    private static final NonSerializableFunctionDefinition 
NON_SER_FUNCTION_DEF_IMPL =
+            new NonSerializableFunctionDefinition();
+    private static final ContextResolvedFunction PERMANENT_FUNCTION =
+            ContextResolvedFunction.permanent(FUNCTION_CAT_ID, SER_UDF_IMPL);
 
     @ParameterizedTest
     @MethodSource("testRexNodeSerde")
     public void testRexNodeSerde(RexNode rexNode) throws IOException {
-        final SerdeContext serdeContext = configuredSerdeContext();
-        final String json = toJson(serdeContext, rexNode);
-        final RexNode actual = toObject(serdeContext, json, RexNode.class);
-
+        final RexNode actual = testJsonRoundTrip(rexNode, RexNode.class);
         assertThat(actual).isEqualTo(rexNode);
     }
 
+    @Test
+    public void testInlineFunction() throws IOException {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        // Serializable function
+        testJsonRoundTrip(
+                createFunctionCall(serdeContext, 
ContextResolvedFunction.anonymous(SER_UDF_IMPL)),
+                RexNode.class);
+
+        // Non-serializable function due to fields
+        assertThatThrownBy(
+                        () ->
+                                toJson(
+                                        serdeContext,
+                                        createFunctionCall(
+                                                serdeContext,
+                                                
ContextResolvedFunction.anonymous(
+                                                        NON_SER_UDF_IMPL))))
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "The function's implementation class must not 
be stateful"));
+    }
+
+    @Test
+    public void testSystemFunction() throws Throwable {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        final ThrowingCallable callable =
+                () ->
+                        testJsonRoundTrip(
+                                serdeContext,
+                                createFunctionCall(
+                                        serdeContext,
+                                        ContextResolvedFunction.permanent(
+                                                FUNCTION_SYS_ID, 
NON_SER_UDF_IMPL)),
+                                RexNode.class);
+
+        // Missing function
+        assertThatThrownBy(callable)
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "Could not lookup system function '" + 
FUNCTION_NAME + "'."));
+
+        // Module provided permanent function
+        serdeContext
+                .getFlinkContext()
+                .getModuleManager()
+                .loadModule("myModule", FunctionProvidingModule.INSTANCE);
+        callable.call();
+    }
+
+    @Test
+    public void testTemporarySystemFunction() throws Throwable {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        final ThrowingCallable callable =
+                () ->
+                        testJsonRoundTrip(
+                                serdeContext,
+                                createFunctionCall(
+                                        serdeContext,
+                                        ContextResolvedFunction.temporary(
+                                                FUNCTION_SYS_ID, 
NON_SER_UDF_IMPL)),
+                                RexNode.class);
+
+        // Missing function
+        assertThatThrownBy(callable)
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "Could not lookup system function '" + 
FUNCTION_NAME + "'."));
+
+        // Registered temporary function
+        registerTemporaryFunction(serdeContext);
+        callable.call();
+    }
+
+    @Test
+    public void testTemporaryCatalogFunction() throws Throwable {
+        final SerdeContext serdeContext = contradictingSerdeContext();
+
+        final ThrowingCallable callable =
+                () ->
+                        testJsonRoundTrip(
+                                serdeContext,
+                                createFunctionCall(
+                                        serdeContext,
+                                        ContextResolvedFunction.temporary(
+                                                FUNCTION_CAT_ID, 
NON_SER_FUNCTION_DEF_IMPL)),
+                                RexNode.class);
+
+        // Missing function
+        assertThatThrownBy(callable)
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "The persisted plan does not include all 
required "
+                                        + "catalog metadata for function '"
+                                        + FUNCTION_CAT_ID.asSummaryString()
+                                        + "'."));
+
+        // Registered temporary function
+        registerTemporaryFunction(serdeContext);
+        callable.call();
+    }
+
+    @Nested
+    @DisplayName("Test CatalogPlanCompilation == IDENTIFIER")
+    class TestCompileIdentifier {
+
+        private final CatalogPlanCompilation compilation = 
CatalogPlanCompilation.IDENTIFIER;
+
+        @Nested
+        @DisplayName("and CatalogPlanRestore == IDENTIFIER")
+        class TestRestoreIdentifier {
+
+            private final CatalogPlanRestore restore = 
CatalogPlanRestore.IDENTIFIER;

Review comment:
       the tests modify the ctx via `dropPermanentFunction` or 
`registerTemporaryFunction`, it is safer to create a new one per test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to