wenlong88 commented on a change in pull request #14878:
URL: https://github.com/apache/flink/pull/14878#discussion_r570897982



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkDeserializationContext.java
##########
@@ -22,13 +22,17 @@
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationConfig;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.InjectableValues;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerFactory;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** Custom JSON {@link DeserializationContext} which wraps a {@link 
SerdeContext}. */
 public class FlinkDeserializationContext extends DefaultDeserializationContext 
{
     private static final long serialVersionUID = 1L;
     private final SerdeContext serdeCtx;
+    private ObjectMapper ownerMapper;

Review comment:
       this is a typo? objectMapper?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/EnumTypes.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.types.logical.TimestampKind;
+
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Registry of {@link Enum} classes that can be serialized to JSON.
+ *
+ * <p>Suppose you want to serialize the value {@link 
SqlTrimFunction.Flag#LEADING} to JSON. First,
+ * make sure that {@link SqlTrimFunction.Flag} is registered. The type will be 
serialized as
+ * "SYMBOL". The value will be serialized as the string "LEADING".
+ *
+ * <p>When we deserialize, we rely on the fact that the registered {@code 
enum} classes have
+ * distinct values. Therefore, knowing that {@code (type="SYMBOL", 
value="LEADING")} we can convert
+ * the string "LEADING" to the enum {@code Flag.LEADING}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public abstract class EnumTypes {
+    private EnumTypes() {}
+
+    private static final Map<String, Enum<?>> ENUM_BY_NAME;

Review comment:
       why should we need this? as I known the default json serialization for 
enum is use the name of the value.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
##########
@@ -44,15 +44,17 @@ public static boolean hasJsonCreatorAnnotation(Class<?> 
clazz) {
 
     /** Create an {@link ObjectMapper} which DeserializationContext wraps a 
{@link SerdeContext}. */
     public static ObjectMapper createObjectMapper(SerdeContext serdeCtx) {
+        FlinkDeserializationContext ctx =
+                new FlinkDeserializationContext(
+                        new 
DefaultDeserializationContext.Impl(BeanDeserializerFactory.instance),
+                        serdeCtx);
         ObjectMapper mapper =
                 new ObjectMapper(
                         null, // JsonFactory
                         null, // DefaultSerializerProvider
-                        new FlinkDeserializationContext(
-                                new DefaultDeserializationContext.Impl(
-                                        BeanDeserializerFactory.instance),
-                                serdeCtx));
+                        ctx);
         mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
+        ctx.setObjectMapper(mapper);

Review comment:
       can we prevent the cyclic dependency here? 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.planner.plan.schema.GenericRelDataType;
+import org.apache.flink.table.planner.plan.schema.RawRelDataType;
+import org.apache.flink.table.planner.plan.schema.StructuredRelDataType;
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.type.ArraySqlType;
+import org.apache.calcite.sql.type.MapSqlType;
+import org.apache.calcite.sql.type.MultisetSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.io.IOException;
+
+/**
+ * JSON serializer for {@link RelDataType}. refer to {@link 
RelDataTypeJsonDeserializer} for
+ * deserializer.
+ */
+public class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_TYPE_NAME = "typeName";
+    public static final String FIELD_NAME_FILED_NAME = "fieldName";
+    public static final String FIELD_NAME_NULLABLE = "nullable";
+    public static final String FIELD_NAME_PRECISION = "precision";
+    public static final String FIELD_NAME_SCALE = "scale";
+    public static final String FIELD_NAME_FIELDS = "fields";
+    public static final String FIELD_NAME_STRUCT_KIND = "structKind";
+    public static final String FIELD_NAME_TIMESTAMP_KIND = "timestampKind";
+    public static final String FIELD_NAME_ELEMENT = "element";
+    public static final String FIELD_NAME_KEY = "key";
+    public static final String FIELD_NAME_VALUE = "value";
+    public static final String FIELD_NAME_TYPE_INFO = "typeInfo";
+    public static final String FIELD_NAME_RAW_TYPE = "rawType";
+    public static final String FIELD_NAME_STRUCTURED_TYPE = "structuredType";
+
+    public RelDataTypeJsonSerializer() {
+        super(RelDataType.class);
+    }
+
+    @Override
+    public void serialize(
+            RelDataType relDataType,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStartObject();
+        serialize(relDataType, jsonGenerator);
+        jsonGenerator.writeEndObject();
+    }
+
+    private void serialize(RelDataType relDataType, JsonGenerator gen) throws 
IOException {
+        if (relDataType instanceof TimeIndicatorRelDataType) {

Review comment:
       can we reuse the Serde of LogicalType so that we don't need to have two 
kinds of type in json? such as FlinkTypeFactory#toLogicalType, I think Logic 
Type may be eable to cover RelDataType we are interested in. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to