twalthr commented on a change in pull request #18479:
URL: https://github.com/apache/flink/pull/18479#discussion_r797407395



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -46,54 +46,44 @@
 @JsonIgnoreProperties(ignoreUnknown = true)
 public abstract class ExecNodeBase<T> implements ExecNode<T> {
 
-    /** The unique identifier for each ExecNode in the json plan. */
-    @JsonIgnore private final int id;
+    private final String description;
 
-    @JsonIgnore private final String description;
+    private final LogicalType outputType;
 
-    @JsonIgnore private final LogicalType outputType;
+    private final List<InputProperty> inputProperties;
 
-    @JsonIgnore private final List<InputProperty> inputProperties;
+    private List<ExecEdge> inputEdges;
 
-    @JsonIgnore private List<ExecEdge> inputEdges;
+    private transient Transformation<T> transformation;
 
-    @JsonIgnore private transient Transformation<T> transformation;
+    /** Holds the context information (id, name, version) as deserialized from 
a JSON plan. */
+    @JsonProperty(value = FIELD_NAME_TYPE, access = 
JsonProperty.Access.WRITE_ONLY)
+    private final ExecNodeContext context;
 
-    /** This is used to assign a unique ID to every ExecNode. */
-    private static Integer idCounter = 0;
-
-    /** Generate an unique ID for ExecNode. */
-    public static int getNewNodeId() {
-        idCounter++;
-        return idCounter;
-    }
-
-    /** Reset the id counter to 0. */
-    @VisibleForTesting
-    public static void resetIdCounter() {
-        idCounter = 0;
+    /**
+     * Retrieves the default context from the {@link ExecNodeMetadata} 
annotation to be serialized
+     * into the JSON plan.
+     */
+    @JsonProperty(value = FIELD_NAME_TYPE, access = 
JsonProperty.Access.READ_ONLY, index = 1)
+    public ExecNodeContext getContextFromAnnotation() {

Review comment:
       can be `final` and `protected`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Repeatable;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to be used for {@link ExecNode}s to keep necessary metadata when
+ * serialising/deserializing them in a plan. It's used for internal 
bookkeeping across Flink

Review comment:
       `serializing`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDataStreamScan;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDropUpdateBefore;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExpand;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupTableAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIntervalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLegacySink;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLegacyTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultipleInput;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupTableAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSort;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecValues;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowDeduplicate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoin;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowRank;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Utility class for {@link ExecNodeMetadata} related functionality. */
+@Internal
+public final class ExecNodeMetadataUtil {
+
+    private ExecNodeMetadataUtil() {
+        // no instantiation
+    }
+
+    private static final Set<Class<? extends ExecNode<?>>> EXEC_NODES =
+            new HashSet<Class<? extends ExecNode<?>>>() {
+                {
+                    add(StreamExecCalc.class);
+                    add(StreamExecChangelogNormalize.class);
+                    add(StreamExecCorrelate.class);
+                    add(StreamExecDeduplicate.class);
+                    add(StreamExecDropUpdateBefore.class);
+                    add(StreamExecExchange.class);
+                    add(StreamExecExpand.class);
+                    add(StreamExecGlobalGroupAggregate.class);
+                    add(StreamExecGlobalWindowAggregate.class);
+                    add(StreamExecGroupAggregate.class);
+                    add(StreamExecGroupWindowAggregate.class);
+                    add(StreamExecIncrementalGroupAggregate.class);
+                    add(StreamExecIntervalJoin.class);
+                    add(StreamExecJoin.class);
+                    add(StreamExecLimit.class);
+                    add(StreamExecLocalGroupAggregate.class);
+                    add(StreamExecLocalWindowAggregate.class);
+                    add(StreamExecLookupJoin.class);
+                    add(StreamExecMatch.class);
+                    add(StreamExecMiniBatchAssigner.class);
+                    add(StreamExecOverAggregate.class);
+                    add(StreamExecPythonCalc.class);
+                    add(StreamExecPythonCorrelate.class);
+                    add(StreamExecPythonGroupAggregate.class);
+                    add(StreamExecPythonGroupWindowAggregate.class);
+                    add(StreamExecPythonOverAggregate.class);
+                    add(StreamExecRank.class);
+                    add(StreamExecSink.class);
+                    add(StreamExecSortLimit.class);
+                    add(StreamExecTableSourceScan.class);
+                    add(StreamExecTemporalJoin.class);
+                    add(StreamExecTemporalSort.class);
+                    add(StreamExecUnion.class);
+                    add(StreamExecValues.class);
+                    add(StreamExecWatermarkAssigner.class);
+                    add(StreamExecWindowAggregate.class);
+                    add(StreamExecWindowDeduplicate.class);
+                    add(StreamExecWindowJoin.class);
+                    add(StreamExecWindowRank.class);
+                    add(StreamExecWindowTableFunction.class);
+                }
+            };
+
+    private static final Map<ExecNodeNameVersion, Class<? extends 
ExecNode<?>>> LOOKUP_MAP =
+            new HashMap<>();
+
+    static {
+        for (Class<? extends ExecNode<?>> execNodeClass : EXEC_NODES) {
+            addToLookupMap(execNodeClass);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    static final Set<Class<? extends ExecNode>> UNSUPPORTED_JSON_SERDE_CLASSES 
=
+            new HashSet<Class<? extends ExecNode>>() {
+                {
+                    add(StreamExecDataStreamScan.class);
+                    add(StreamExecLegacyTableSourceScan.class);
+                    add(StreamExecLegacySink.class);
+                    add(StreamExecGroupTableAggregate.class);
+                    add(StreamExecPythonGroupTableAggregate.class);
+                    add(StreamExecSort.class);
+                    add(StreamExecMultipleInput.class);
+                }
+            };
+
+    public static Set<Class<? extends ExecNode<?>>> execNodes() {
+        return EXEC_NODES;
+    }
+
+    public static Class<? extends ExecNode<?>> retrieveExecNode(String name, 
int version) {
+        return LOOKUP_MAP.get(new ExecNodeNameVersion(name, version));
+    }
+
+    public static <T extends ExecNode<?>> boolean isUnsupported(Class<T> 
execNode) {
+        return !StreamExecNode.class.isAssignableFrom(execNode)
+                || UNSUPPORTED_JSON_SERDE_CLASSES.contains(execNode);
+    }
+
+    @VisibleForTesting
+    static void addTestNode(Class<? extends ExecNode<?>> execNodeClass) {
+        addToLookupMap(execNodeClass);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static List<ExecNodeMetadata> extractMetadataFromAnnotation(
+            Class<? extends ExecNode> execNodeClass) {
+        List<ExecNodeMetadata> metadata = new ArrayList<>();
+        ExecNodeMetadata annotation = 
execNodeClass.getDeclaredAnnotation(ExecNodeMetadata.class);
+        if (annotation != null) {
+            metadata.add(annotation);
+        }
+
+        MultipleExecNodeMetadata annotations =
+                
execNodeClass.getDeclaredAnnotation(MultipleExecNodeMetadata.class);
+        if (annotations != null) {
+            if (metadata.isEmpty()) {
+                for (ExecNodeMetadata annot : annotations.value()) {
+                    if (annot != null) {
+                        metadata.add(annot);
+                    }
+                }
+            } else {
+                throw new IllegalStateException(
+                        String.format(
+                                "ExecNode: %s is annotated both with %s and 
%s. Please use only "
+                                        + "%s or multiple %s",
+                                execNodeClass.getCanonicalName(),
+                                ExecNodeMetadata.class,
+                                MultipleExecNodeMetadata.class,
+                                MultipleExecNodeMetadata.class,
+                                ExecNodeMetadata.class));
+            }
+        }
+        return metadata;
+    }
+
+    private static void addToLookupMap(Class<? extends ExecNode<?>> 
execNodeClass) {
+        if (!JsonSerdeUtil.hasJsonCreatorAnnotation(execNodeClass)) {
+            throw new IllegalStateException(
+                    String.format(
+                            "ExecNode: %s does not implement @JsonCreator 
annotation on "
+                                    + "constructor.",
+                            execNodeClass.getCanonicalName()));
+        }
+
+        List<ExecNodeMetadata> metadata = 
extractMetadataFromAnnotation(execNodeClass);
+        if (metadata.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "ExecNode: %s is missing %s annotation.",
+                            execNodeClass.getCanonicalName(),
+                            ExecNodeMetadata.class.getSimpleName()));
+        }
+
+        for (ExecNodeMetadata meta : metadata) {
+            doAddToMap(new ExecNodeNameVersion(meta.name(), meta.version()), 
execNodeClass);
+        }
+    }
+
+    private static void doAddToMap(
+            ExecNodeNameVersion key, Class<? extends ExecNode<?>> 
execNodeClass) {
+        if (LOOKUP_MAP.containsKey(key)) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Found duplicate ExecNode: %s. This is a bug, 
please contact developers.",

Review comment:
       `This is a bug. Please file an issue.`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDataStreamScan;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDropUpdateBefore;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExpand;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupTableAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIntervalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLegacySink;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLegacyTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultipleInput;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupTableAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSort;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecValues;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowDeduplicate;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoin;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowRank;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Utility class for {@link ExecNodeMetadata} related functionality. */
+@Internal
+public final class ExecNodeMetadataUtil {
+
+    private ExecNodeMetadataUtil() {
+        // no instantiation
+    }
+
+    private static final Set<Class<? extends ExecNode<?>>> EXEC_NODES =
+            new HashSet<Class<? extends ExecNode<?>>>() {
+                {
+                    add(StreamExecCalc.class);
+                    add(StreamExecChangelogNormalize.class);
+                    add(StreamExecCorrelate.class);
+                    add(StreamExecDeduplicate.class);
+                    add(StreamExecDropUpdateBefore.class);
+                    add(StreamExecExchange.class);
+                    add(StreamExecExpand.class);
+                    add(StreamExecGlobalGroupAggregate.class);
+                    add(StreamExecGlobalWindowAggregate.class);
+                    add(StreamExecGroupAggregate.class);
+                    add(StreamExecGroupWindowAggregate.class);
+                    add(StreamExecIncrementalGroupAggregate.class);
+                    add(StreamExecIntervalJoin.class);
+                    add(StreamExecJoin.class);
+                    add(StreamExecLimit.class);
+                    add(StreamExecLocalGroupAggregate.class);
+                    add(StreamExecLocalWindowAggregate.class);
+                    add(StreamExecLookupJoin.class);
+                    add(StreamExecMatch.class);
+                    add(StreamExecMiniBatchAssigner.class);
+                    add(StreamExecOverAggregate.class);
+                    add(StreamExecPythonCalc.class);
+                    add(StreamExecPythonCorrelate.class);
+                    add(StreamExecPythonGroupAggregate.class);
+                    add(StreamExecPythonGroupWindowAggregate.class);
+                    add(StreamExecPythonOverAggregate.class);
+                    add(StreamExecRank.class);
+                    add(StreamExecSink.class);
+                    add(StreamExecSortLimit.class);
+                    add(StreamExecTableSourceScan.class);
+                    add(StreamExecTemporalJoin.class);
+                    add(StreamExecTemporalSort.class);
+                    add(StreamExecUnion.class);
+                    add(StreamExecValues.class);
+                    add(StreamExecWatermarkAssigner.class);
+                    add(StreamExecWindowAggregate.class);
+                    add(StreamExecWindowDeduplicate.class);
+                    add(StreamExecWindowJoin.class);
+                    add(StreamExecWindowRank.class);
+                    add(StreamExecWindowTableFunction.class);
+                }
+            };
+
+    private static final Map<ExecNodeNameVersion, Class<? extends 
ExecNode<?>>> LOOKUP_MAP =
+            new HashMap<>();
+
+    static {
+        for (Class<? extends ExecNode<?>> execNodeClass : EXEC_NODES) {
+            addToLookupMap(execNodeClass);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    static final Set<Class<? extends ExecNode>> UNSUPPORTED_JSON_SERDE_CLASSES 
=
+            new HashSet<Class<? extends ExecNode>>() {
+                {
+                    add(StreamExecDataStreamScan.class);
+                    add(StreamExecLegacyTableSourceScan.class);
+                    add(StreamExecLegacySink.class);
+                    add(StreamExecGroupTableAggregate.class);
+                    add(StreamExecPythonGroupTableAggregate.class);
+                    add(StreamExecSort.class);
+                    add(StreamExecMultipleInput.class);
+                }
+            };
+
+    public static Set<Class<? extends ExecNode<?>>> execNodes() {
+        return EXEC_NODES;
+    }
+
+    public static Class<? extends ExecNode<?>> retrieveExecNode(String name, 
int version) {
+        return LOOKUP_MAP.get(new ExecNodeNameVersion(name, version));
+    }
+
+    public static <T extends ExecNode<?>> boolean isUnsupported(Class<T> 
execNode) {
+        return !StreamExecNode.class.isAssignableFrom(execNode)
+                || UNSUPPORTED_JSON_SERDE_CLASSES.contains(execNode);
+    }
+
+    @VisibleForTesting
+    static void addTestNode(Class<? extends ExecNode<?>> execNodeClass) {
+        addToLookupMap(execNodeClass);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static List<ExecNodeMetadata> extractMetadataFromAnnotation(
+            Class<? extends ExecNode> execNodeClass) {
+        List<ExecNodeMetadata> metadata = new ArrayList<>();
+        ExecNodeMetadata annotation = 
execNodeClass.getDeclaredAnnotation(ExecNodeMetadata.class);
+        if (annotation != null) {
+            metadata.add(annotation);
+        }
+
+        MultipleExecNodeMetadata annotations =
+                
execNodeClass.getDeclaredAnnotation(MultipleExecNodeMetadata.class);
+        if (annotations != null) {
+            if (metadata.isEmpty()) {
+                for (ExecNodeMetadata annot : annotations.value()) {
+                    if (annot != null) {
+                        metadata.add(annot);
+                    }
+                }
+            } else {
+                throw new IllegalStateException(
+                        String.format(
+                                "ExecNode: %s is annotated both with %s and 
%s. Please use only "
+                                        + "%s or multiple %s",
+                                execNodeClass.getCanonicalName(),
+                                ExecNodeMetadata.class,
+                                MultipleExecNodeMetadata.class,
+                                MultipleExecNodeMetadata.class,
+                                ExecNodeMetadata.class));
+            }
+        }
+        return metadata;
+    }
+
+    private static void addToLookupMap(Class<? extends ExecNode<?>> 
execNodeClass) {
+        if (!JsonSerdeUtil.hasJsonCreatorAnnotation(execNodeClass)) {
+            throw new IllegalStateException(
+                    String.format(
+                            "ExecNode: %s does not implement @JsonCreator 
annotation on "
+                                    + "constructor.",
+                            execNodeClass.getCanonicalName()));
+        }
+
+        List<ExecNodeMetadata> metadata = 
extractMetadataFromAnnotation(execNodeClass);
+        if (metadata.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "ExecNode: %s is missing %s annotation.",
+                            execNodeClass.getCanonicalName(),
+                            ExecNodeMetadata.class.getSimpleName()));
+        }
+
+        for (ExecNodeMetadata meta : metadata) {
+            doAddToMap(new ExecNodeNameVersion(meta.name(), meta.version()), 
execNodeClass);
+        }
+    }
+
+    private static void doAddToMap(
+            ExecNodeNameVersion key, Class<? extends ExecNode<?>> 
execNodeClass) {
+        if (LOOKUP_MAP.containsKey(key)) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Found duplicate ExecNode: %s. This is a bug, 
please contact developers.",
+                            key));
+        }
+        LOOKUP_MAP.put(key, execNodeClass);
+    }
+
+    /**
+     * Returns the {@link ExecNodeMetadata} annotation of the class with the 
highest (most recent)
+     * {@link ExecNodeMetadata#version()}.
+     */
+    @SuppressWarnings("rawtypes")

Review comment:
       There are quite a few suppressions. As mentioned in a comment before, I 
think we can get rid of a couple of them by correct declaration.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Helper class that holds the necessary identifier fields that are used for 
JSON plan serialization
+ * and deserialization. It is instantiated using {@link 
ExecNodeContext#newContext(Class)} when
+ * creating a new instance of an {@link ExecNode}, so that is contains the 
info from the {@link
+ * ExecNodeMetadata} annotation of the class with the latest {@link 
ExecNodeMetadata#version()}. It
+ * can also be instantiated with {@link 
ExecNodeContext#ExecNodeContext(String)} automatically when
+ * the {@link ExecNode} is deserialized from a JSON Plan, and in this case the 
{@link
+ * ExecNodeContext} contains the version that is read from the JSON Plan and 
not the latest one.
+ */
+@Internal
+public final class ExecNodeContext {
+
+    /** This is used to assign a unique ID to every ExecNode. */
+    private static Integer idCounter = 0;

Review comment:
       `AtomicInteger`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Helper class that holds the necessary identifier fields that are used for 
JSON plan serialization
+ * and deserialization. It is instantiated using {@link 
ExecNodeContext#newContext(Class)} when
+ * creating a new instance of an {@link ExecNode}, so that is contains the 
info from the {@link
+ * ExecNodeMetadata} annotation of the class with the latest {@link 
ExecNodeMetadata#version()}. It
+ * can also be instantiated with {@link 
ExecNodeContext#ExecNodeContext(String)} automatically when
+ * the {@link ExecNode} is deserialized from a JSON Plan, and in this case the 
{@link
+ * ExecNodeContext} contains the version that is read from the JSON Plan and 
not the latest one.
+ */
+@Internal
+public final class ExecNodeContext {
+
+    /** This is used to assign a unique ID to every ExecNode. */
+    private static Integer idCounter = 0;
+
+    /** Generate an unique ID for ExecNode. */
+    public static int newNodeId() {
+        idCounter++;
+        return idCounter;
+    }
+
+    /** Reset the id counter to 0. */
+    @VisibleForTesting
+    public static void resetIdCounter() {
+        idCounter = 0;
+    }
+
+    private Integer id;

Review comment:
       mark `@Nullable` and `final`, quickly explain why nullable

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Helper class that holds the necessary identifier fields that are used for 
JSON plan serialization
+ * and deserialization. It is instantiated using {@link 
ExecNodeContext#newContext(Class)} when
+ * creating a new instance of an {@link ExecNode}, so that is contains the 
info from the {@link
+ * ExecNodeMetadata} annotation of the class with the latest {@link 
ExecNodeMetadata#version()}. It
+ * can also be instantiated with {@link 
ExecNodeContext#ExecNodeContext(String)} automatically when
+ * the {@link ExecNode} is deserialized from a JSON Plan, and in this case the 
{@link
+ * ExecNodeContext} contains the version that is read from the JSON Plan and 
not the latest one.
+ */
+@Internal
+public final class ExecNodeContext {
+
+    /** This is used to assign a unique ID to every ExecNode. */
+    private static Integer idCounter = 0;
+
+    /** Generate an unique ID for ExecNode. */
+    public static int newNodeId() {
+        idCounter++;
+        return idCounter;
+    }
+
+    /** Reset the id counter to 0. */
+    @VisibleForTesting
+    public static void resetIdCounter() {
+        idCounter = 0;
+    }
+
+    private Integer id;
+    private final String name;
+    private final Integer version;
+
+    private ExecNodeContext() {
+        this(null, null);
+    }
+
+    private ExecNodeContext(String name, Integer version) {
+        this.name = name;
+        this.version = version;
+    }
+
+    private ExecNodeContext(int id, String name, Integer version) {
+        this(name, version);
+        this.id = id;
+    }
+
+    @JsonCreator
+    public ExecNodeContext(String value) {
+        String[] split = value.split("_");

Review comment:
       Explain the format in the class JavaDoc.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Repeatable;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to be used for {@link ExecNode}s to keep necessary metadata when
+ * serialising/deserializing them in a plan. It's used for internal 
bookkeeping across Flink
+ * versions, and to provide necessary information to the testing 
infrastructure.
+ *
+ * <p>Each {@link ExecNode} needs to be annotated and provide the necessary 
metadata info so that it
+ * can be correctly serialized and later on instantiated from a string (JSON) 
plan.
+ *
+ * <p>It's possible for one {@link ExecNode} class to use multiple annotations 
to denote ability to
+ * upgrade to more versions. an {@link ExecNode} class can be annotated 
directly with multiple
+ * {@link ExecNodeMetadata} annotations, or with a single {@link 
MultipleExecNodeMetadata}
+ * annotation where the {@link MultipleExecNodeMetadata#value()} is an array 
of {@link
+ * ExecNodeMetadata} annotations.
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Repeatable(value = MultipleExecNodeMetadata.class)
+@Experimental

Review comment:
       `@Internal` only used for our internal bookkeeping. we don't need to 
provide API stability here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to