slinkydeveloper commented on a change in pull request #18479:
URL: https://github.com/apache/flink/pull/18479#discussion_r791883246
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
##########
@@ -21,8 +21,7 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.planner.plan.logical.LogicalWindow;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.utils.ReflectionsUtil;
Review comment:
Given you removed this, can you check if you can remove the reflections
dependency once for all?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTypeIdResolver.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
+
+/**
+ * Helper class to implement the Jackson subtype
serialisation/de-serialisation. Instead of using
+ * the class name use the {@link ExecNodeMetadata#name()} and {@link
ExecNodeMetadata#version()} to
+ * perform a lookup in a static map residing in {@link ExecNodeMetadataUtil}.
+ */
+public class ExecNodeTypeIdResolver extends TypeIdResolverBase {
+
+ private JavaType superType;
+
+ @Override
+ public void init(JavaType baseType) {
+ superType = baseType;
+ }
+
+ @Override
+ public Id getMechanism() {
+ return Id.NAME;
+ }
+
+ @Override
+ public String idFromValue(Object obj) {
+ return idFromValueAndType(obj, obj.getClass());
+ }
+
+ @Override
+ public String idFromValueAndType(Object obj, Class<?> subType) {
+ return ((ExecNodeBase<?>) obj).getContextFromAnnotation().toString();
+ }
+
+ @Override
+ public JavaType typeFromId(DatabindContext context, String id) {
+ ExecNodeContext execNodeContext = new ExecNodeContext(id);
+ return context.constructSpecializedType(
+ superType,
+ ExecNodeMetadataUtil.retrieveExecNode(
+ execNodeContext.getName(),
execNodeContext.getVersion()));
+ }
Review comment:
I wonder if you can cache the `constructSpecializedType` for each exec
node in `init`. Perhaps you should be able to access the type factory from the
super class? Or perhaps there is already a cache in the
`DatabindContext`/`TypeFactory` for constructed `JavaType`s?
The reason I ask is that, if i'm not mistaken, constructing `JavaType` is
where the reflections magic happens, so it's quite expensive as it reads the
class, all its fields, methods, annotations, it even does some `Unsafe` related
stuff, so if we can trivially avoid to repeat it, it's definitely better.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Helper Pojo that holds the necessary identifier fields that are used for
JSON plan serialisation
+ * and de-serialisation.
+ */
+public class ExecNodeContext {
+ /** The unique identifier for each ExecNode in the JSON plan. */
Review comment:
Can you have these javadocs on the getters instead?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCorrelate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDropUpdateBefore;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExpand;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupWindowAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIntervalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecValues;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowDeduplicate;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoin;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowRank;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Utility class for ExecNodeMetadata related functionality. */
+public final class ExecNodeMetadataUtil {
+
+ private ExecNodeMetadataUtil() {
+ // no instantiation
+ }
+
+ private static final Map<ExecNodeNameVersion, Class<? extends
ExecNode<?>>> lookupMap =
+ new HashMap<>();
+
+ private static final List<Class<? extends ExecNode<?>>> execNodes = new
ArrayList<>();
+
+ static {
+ execNodes.add(StreamExecCalc.class);
+ execNodes.add(StreamExecChangelogNormalize.class);
+ execNodes.add(StreamExecCorrelate.class);
+ execNodes.add(StreamExecDeduplicate.class);
+ execNodes.add(StreamExecDropUpdateBefore.class);
+ execNodes.add(StreamExecExchange.class);
+ execNodes.add(StreamExecExpand.class);
+ execNodes.add(StreamExecGlobalGroupAggregate.class);
+ execNodes.add(StreamExecGlobalWindowAggregate.class);
+ execNodes.add(StreamExecGroupAggregate.class);
+ execNodes.add(StreamExecGroupWindowAggregate.class);
+ execNodes.add(StreamExecIncrementalGroupAggregate.class);
+ execNodes.add(StreamExecIntervalJoin.class);
+ execNodes.add(StreamExecJoin.class);
+ execNodes.add(StreamExecLimit.class);
+ execNodes.add(StreamExecLocalGroupAggregate.class);
+ execNodes.add(StreamExecLocalWindowAggregate.class);
+ execNodes.add(StreamExecLookupJoin.class);
+ execNodes.add(StreamExecMatch.class);
+ execNodes.add(StreamExecMiniBatchAssigner.class);
+ execNodes.add(StreamExecOverAggregate.class);
+ execNodes.add(StreamExecPythonCalc.class);
+ execNodes.add(StreamExecPythonCorrelate.class);
+ execNodes.add(StreamExecPythonGroupAggregate.class);
+ execNodes.add(StreamExecPythonGroupWindowAggregate.class);
+ execNodes.add(StreamExecPythonOverAggregate.class);
+ execNodes.add(StreamExecRank.class);
+ execNodes.add(StreamExecSink.class);
+ execNodes.add(StreamExecSortLimit.class);
+ execNodes.add(StreamExecTableSourceScan.class);
+ execNodes.add(StreamExecTemporalJoin.class);
+ execNodes.add(StreamExecTemporalSort.class);
+ execNodes.add(StreamExecUnion.class);
+ execNodes.add(StreamExecValues.class);
+ execNodes.add(StreamExecWatermarkAssigner.class);
+ execNodes.add(StreamExecWindowAggregate.class);
+ execNodes.add(StreamExecWindowDeduplicate.class);
+ execNodes.add(StreamExecWindowJoin.class);
+ execNodes.add(StreamExecWindowRank.class);
+ execNodes.add(StreamExecWindowTableFunction.class);
+ }
+
+ static {
+ for (Class<? extends ExecNode<?>> execNode : execNodes) {
+ ExecNodeMetadata metadata =
execNode.getAnnotation(ExecNodeMetadata.class);
+ if (metadata == null) {
+ throw new IllegalStateException(
+ "ExecNode: "
+ + execNode.getSimpleName()
+ + " is missing "
+ + ExecNodeMetadata.class.getSimpleName()
+ + " annotation");
Review comment:
Add another sentence here:
```
This is a bug, please contact developers.
```
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to be used for {@link ExecNode}s to keep necessary metadata when
+ * serialising/deserialising them in a plan.
+ *
+ * <p>Each {@link ExecNode} needs to be annotated and provide the necessary
metadata info so that it
+ * can be correctly serialised and later on instantiated from a string (JSON)
plan.
+ *
+ * <p>It's possible for one {@link ExecNode} class to user multiple
annotations to denote ability to
+ * upgrade to more versions.
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@PublicEvolving
+public @interface ExecNodeMetadata {
+ // main information
+
+ /**
+ * Unique name of the {@link ExecNode} for serialization/deserialization
and uid() generation.
+ * Together with version, uniquely identifies the {@link ExecNode} class.
+ */
+ String name();
+
+ /**
+ * A positive integer denoting the evolving version of an {@link
ExecNode}, used for
+ * serialization/deserialization and uid() generation. Together with
{@link #name()}, uniquely
+ * identifies the {@link ExecNode} class.
+ */
+ @JsonProperty("version")
+ int version();
+
+ // maintenance information for internal/community/test usage
+
+ /**
+ * Hard coded list of {@link ExecutionConfigOptions} keys of in the Flink
version when the
Review comment:
Replace `list` with `set`
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTypeIdResolver.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
+
+/**
+ * Helper class to implement the Jackson subtype
serialisation/de-serialisation. Instead of using
+ * the class name use the {@link ExecNodeMetadata#name()} and {@link
ExecNodeMetadata#version()} to
+ * perform a lookup in a static map residing in {@link ExecNodeMetadataUtil}.
+ */
+public class ExecNodeTypeIdResolver extends TypeIdResolverBase {
+
+ private JavaType superType;
+
+ @Override
+ public void init(JavaType baseType) {
+ superType = baseType;
+ }
+
+ @Override
+ public Id getMechanism() {
+ return Id.NAME;
+ }
+
+ @Override
+ public String idFromValue(Object obj) {
+ return idFromValueAndType(obj, obj.getClass());
+ }
+
+ @Override
+ public String idFromValueAndType(Object obj, Class<?> subType) {
+ return ((ExecNodeBase<?>) obj).getContextFromAnnotation().toString();
+ }
+
+ @Override
+ public JavaType typeFromId(DatabindContext context, String id) {
+ ExecNodeContext execNodeContext = new ExecNodeContext(id);
+ return context.constructSpecializedType(
+ superType,
+ ExecNodeMetadataUtil.retrieveExecNode(
+ execNodeContext.getName(),
execNodeContext.getVersion()));
+ }
Review comment:
Ok as I was expecting, there is already an LRU cache within
`TypeFactory` that takes care of caching `JavaType`. So you can ignore my
comment if you want.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]