fsk119 commented on code in PR #20617:
URL: https://github.com/apache/flink/pull/20617#discussion_r953388060


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
+import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A simple {@link Extension} to be used by tests that require a {@link 
SqlGatewayRestEndpoint}. */

Review Comment:
   nit: 
   ```
   /** A simple {@link Extension} that manages the lifecycle of the {@link 
SqlGatewayRestEndpoint}. */
   ```
   



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
+import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A simple {@link Extension} to be used by tests that require a {@link 
SqlGatewayRestEndpoint}. */
+public class SqlGatewayRestEndpointExtension implements BeforeAllCallback, 
AfterAllCallback {
+
+    // to supply service needed when starting the rest end point

Review Comment:
   Remove?  Or use /** */ to describe fields.



##########
flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/results/serde/LogicalTypeJsonSerDeTest.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results.serde;
+
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.runtime.typeutils.ExternalSerializer;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.types.Row;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Tests for {@link LogicalType} serialization and deserialization. */
+@Execution(CONCURRENT)
+public class LogicalTypeJsonSerDeTest {
+
+    private final ObjectMapper mapper = getObjectMapper();
+
+    // final constants for testing unsupported case
+    private final LogicalType unsupportedType =
+            new 
DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR);
+    private final String serializerExceptionMessageFormat =
+            "Unable to serialize logical type '%s'. Please check the 
documentation for supported types.";
+    private final String unsupportedTypeString = "INTERVAL_DAY_TIME";
+    private final String json =
+            String.format(
+                    "{\"%s\": \"%s\", \"%s\": %s}",
+                    "type", unsupportedTypeString, "nullable", "true");
+    private final String deserializerExceptionMessageFormat =
+            "Unable to deserialize a logical type of type root '%s'. Please 
check the documentation for supported types.";

Review Comment:
   Local variables are enough. Move into the test case.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
+import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A simple {@link Extension} to be used by tests that require a {@link 
SqlGatewayRestEndpoint}. */
+public class SqlGatewayRestEndpointExtension implements BeforeAllCallback, 
AfterAllCallback {
+
+    // to supply service needed when starting the rest end point
+    private final Supplier<SqlGatewayService> serviceSupplier;
+
+    private SqlGatewayRestEndpoint sqlGatewayRestEndpoint;
+    private String targetAddress;
+    private int targetPort;
+
+    public String getTargetAddress() {
+        return targetAddress;
+    }
+
+    public int getTargetPort() {
+        return targetPort;
+    }
+
+    public SqlGatewayRestEndpointExtension(Supplier<SqlGatewayService> 
serviceSupplier) {
+        this.serviceSupplier = serviceSupplier;
+    }
+
+    @Override
+    public void beforeAll(ExtensionContext context) {
+        String address = InetAddress.getLoopbackAddress().getHostAddress();
+        Configuration config = getBaseConfig(getFlinkConfig(address, address, 
"0"));
+
+        try {
+            sqlGatewayRestEndpoint = new SqlGatewayRestEndpoint(config, 
serviceSupplier.get());
+            sqlGatewayRestEndpoint.start();
+        } catch (Exception e) {
+            throw new SqlGatewayException(
+                    "Unexpected error occurred when trying to start the rest 
endpoint of sql gateway.",
+                    e);
+        }
+
+        InetSocketAddress serverAddress = 
checkNotNull(sqlGatewayRestEndpoint.getServerAddress());
+        targetAddress = serverAddress.getHostName();
+        targetPort = serverAddress.getPort();

Review Comment:
   Can we use NetUtils to get a random port? If other CI runs in the same 
machine, it may fail to bind the port.



##########
flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/results/serde/LogicalTypeJsonSerDeTest.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results.serde;
+
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.runtime.typeutils.ExternalSerializer;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.types.Row;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Tests for {@link LogicalType} serialization and deserialization. */
+@Execution(CONCURRENT)
+public class LogicalTypeJsonSerDeTest {
+
+    private final ObjectMapper mapper = getObjectMapper();
+
+    // final constants for testing unsupported case

Review Comment:
   Remove useless comments... I think variable names are straightforward for us.



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/serde/LogicalTypeJsonDeserializer.java:
##########
@@ -98,6 +100,10 @@ public LogicalType deserialize(JsonParser jsonParser, 
DeserializationContext ctx
     private LogicalType deserializeInternal(JsonNode logicalTypeNode) {
         LogicalTypeRoot typeRoot =
                 
LogicalTypeRoot.valueOf(logicalTypeNode.get(FIELD_NAME_TYPE_NAME).asText());
+        // handle the special case of NullType

Review Comment:
   Can we describe the root reason here. For example, the NullType is always 
nullable.



##########
flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/results/serde/LogicalTypeJsonSerDeTest.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results.serde;
+
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.runtime.typeutils.ExternalSerializer;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.types.Row;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Tests for {@link LogicalType} serialization and deserialization. */
+@Execution(CONCURRENT)
+public class LogicalTypeJsonSerDeTest {
+
+    private final ObjectMapper mapper = getObjectMapper();
+
+    // final constants for testing unsupported case
+    private final LogicalType unsupportedType =
+            new 
DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR);
+    private final String serializerExceptionMessageFormat =
+            "Unable to serialize logical type '%s'. Please check the 
documentation for supported types.";
+    private final String unsupportedTypeString = "INTERVAL_DAY_TIME";
+    private final String json =
+            String.format(
+                    "{\"%s\": \"%s\", \"%s\": %s}",
+                    "type", unsupportedTypeString, "nullable", "true");
+    private final String deserializerExceptionMessageFormat =
+            "Unable to deserialize a logical type of type root '%s'. Please 
check the documentation for supported types.";
+
+    @ParameterizedTest
+    @MethodSource("generateTestData")
+    public void testLogicalTypeJsonSerDe(LogicalType logicalType) throws 
IOException {
+        String json = mapper.writeValueAsString(logicalType);
+        LogicalType actualType = mapper.readValue(json, LogicalType.class);
+
+        assertThat(actualType).isEqualTo(logicalType);
+    }
+
+    @Test
+    public void testSerDeWithUnsupportedType() {
+        // test to serialize unsupported LogicalType
+        assertThatThrownBy(() -> mapper.writeValueAsString(unsupportedType))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                String.format(
+                                        serializerExceptionMessageFormat,
+                                        unsupportedType.asSummaryString())));
+
+        // test to deserialize unsupported JSON string
+        assertThatThrownBy(() -> mapper.readValue(json, LogicalType.class))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                String.format(
+                                        deserializerExceptionMessageFormat,
+                                        unsupportedTypeString)));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Test data
+    // 
--------------------------------------------------------------------------------------------
+
+    private static List<LogicalType> generateTestData() {
+        List<LogicalType> types =
+                Arrays.asList(
+                        new BooleanType(),
+                        new TinyIntType(),
+                        new SmallIntType(),
+                        new IntType(),
+                        new BigIntType(),
+                        new FloatType(),
+                        new DoubleType(),
+                        new DateType(),
+                        CharType.ofEmptyLiteral(),
+                        new CharType(),
+                        new CharType(5),
+                        VarCharType.ofEmptyLiteral(),
+                        new VarCharType(),
+                        new VarCharType(5),
+                        BinaryType.ofEmptyLiteral(),
+                        new BinaryType(),
+                        new BinaryType(100),
+                        VarBinaryType.ofEmptyLiteral(),
+                        new VarBinaryType(),
+                        new VarBinaryType(100),
+                        new DecimalType(10),
+                        new DecimalType(15, 5),
+                        new TimeType(),
+                        new TimeType(3),
+                        new TimestampType(),
+                        new TimestampType(3),
+                        new TimestampType(false, 3),
+                        new ZonedTimestampType(),
+                        new ZonedTimestampType(3),
+                        new LocalZonedTimestampType(),
+                        new LocalZonedTimestampType(3),
+                        new LocalZonedTimestampType(false, 3),
+                        // LocalZonedTimestampType#eaquals doesn't compare 
TimestampKind
+                        new LocalZonedTimestampType(false, 
TimestampKind.PROCTIME, 3),
+                        new MapType(new BigIntType(), new IntType(false)),
+                        new MapType(CharType.ofEmptyLiteral(), 
CharType.ofEmptyLiteral()),
+                        new MapType(VarCharType.ofEmptyLiteral(), 
VarCharType.ofEmptyLiteral()),
+                        new MapType(BinaryType.ofEmptyLiteral(), 
BinaryType.ofEmptyLiteral()),
+                        new MapType(VarBinaryType.ofEmptyLiteral(), 
VarBinaryType.ofEmptyLiteral()),
+                        new MapType(new TimestampType(false, 3), new 
LocalZonedTimestampType()),
+                        new ArrayType(new IntType(false)),
+                        new ArrayType(new TimestampType()),
+                        new ArrayType(new LocalZonedTimestampType(false, 3)),
+                        new ArrayType(CharType.ofEmptyLiteral()),
+                        new ArrayType(VarCharType.ofEmptyLiteral()),
+                        new ArrayType(BinaryType.ofEmptyLiteral()),
+                        new ArrayType(VarBinaryType.ofEmptyLiteral()),
+                        new MultisetType(new IntType(false)),
+                        new MultisetType(new TimestampType()),
+                        new MultisetType(new TimestampType(true, 3)),
+                        new MultisetType(CharType.ofEmptyLiteral()),
+                        new MultisetType(VarCharType.ofEmptyLiteral()),
+                        new MultisetType(BinaryType.ofEmptyLiteral()),
+                        new MultisetType(VarBinaryType.ofEmptyLiteral()),
+                        RowType.of(new BigIntType(), new IntType(false), new 
VarCharType(200)),
+                        RowType.of(
+                                new LogicalType[] {
+                                    new BigIntType(), new IntType(false), new 
VarCharType(200)
+                                },
+                                new String[] {"f1", "f2", "f3"}),
+                        RowType.of(
+                                new TimestampType(false, 3), new 
LocalZonedTimestampType(false, 3)),
+                        RowType.of(
+                                CharType.ofEmptyLiteral(),
+                                VarCharType.ofEmptyLiteral(),
+                                BinaryType.ofEmptyLiteral(),
+                                VarBinaryType.ofEmptyLiteral()),
+                        // Row with descriptions
+                        new RowType(
+                                Arrays.asList(
+                                        new RowType.RowField("ID", new 
BigIntType(), "ID desc"),
+                                        new RowType.RowField(
+                                                "Name", new VarCharType(20), 
"Name desc"))),
+                        // custom RawType
+                        new RawType<>(LocalDateTime.class, 
LocalDateTimeSerializer.INSTANCE),
+                        // external RawType
+                        new RawType<>(
+                                Row.class,
+                                ExternalSerializer.of(
+                                        DataTypes.ROW(DataTypes.INT(), 
DataTypes.STRING()))));
+
+        List<LogicalType> testTypes =
+                Stream.concat(
+                                types.stream().map(type -> type.copy(true)),
+                                types.stream().map(type -> type.copy(false)))
+                        .collect(Collectors.toList());
+
+        // ignore nullable for NullType
+        testTypes.add(new NullType());
+
+        return testTypes;
+    }
+
+    /**
+     * LogicalType isn't annotated with Jackson annotations, so it's necessary 
to register the
+     * customer serializer and deserializer when testing LogicalType Serde 
alone.
+     */

Review Comment:
   Remove this comment. It's the Jackson's API, we don't need to tell others 
how to use jackson.



##########
flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/results/serde/LogicalTypeJsonSerDeTest.java:
##########
@@ -77,6 +82,36 @@ public void testLogicalTypeJsonSerDe(LogicalType 
logicalType) throws IOException
         assertThat(actualType).isEqualTo(logicalType);
     }
 
+    @Test
+    public void testSerDeWithUnsupportedType() {
+        ObjectMapper mapper = JsonSerdeUtil.getObjectMapper();
+
+        // test to serialize unsupported LogicalType
+        LogicalType unsupportedType =
+                new 
DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR);
+        assertThatThrownBy(() -> mapper.writeValueAsString(unsupportedType))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                String.format(
+                                        "Unable to serialize logical type 
'%s'. Please check the documentation for supported types.",
+                                        unsupportedType.asSummaryString())));
+
+        // test to deserialize unsupported JSON string
+        String unsupportedTypeString = "INTERVAL_DAY_TIME";
+        String json =
+                String.format(
+                        "{\"%s\": \"%s\", \"%s\": %s}",
+                        "type", unsupportedTypeString, "nullable", "true");
+        assertThatThrownBy(() -> mapper.readValue(json, LogicalType.class))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                String.format(
+                                        "Unable to deserialize a logical type 
of type root '%s'. Please check the documentation for supported types.",
+                                        unsupportedTypeString)));

Review Comment:
   It's better to split into two test cases.



##########
flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/results/serde/LogicalTypeJsonSerDeTest.java:
##########
@@ -116,6 +151,8 @@ private static List<LogicalType> generateTestData() {
                         new LocalZonedTimestampType(),
                         new LocalZonedTimestampType(3),
                         new LocalZonedTimestampType(false, 3),
+                        // LocalZonedTimestampType#eaquals doesn't compare 
TimestampKind

Review Comment:
   Remove this ...



##########
flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/results/serde/LogicalTypeJsonSerDeTest.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results.serde;
+
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.runtime.typeutils.ExternalSerializer;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.types.Row;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Tests for {@link LogicalType} serialization and deserialization. */
+@Execution(CONCURRENT)
+public class LogicalTypeJsonSerDeTest {
+
+    private final ObjectMapper mapper = getObjectMapper();
+
+    // final constants for testing unsupported case
+    private final LogicalType unsupportedType =
+            new 
DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR);
+    private final String serializerExceptionMessageFormat =
+            "Unable to serialize logical type '%s'. Please check the 
documentation for supported types.";
+    private final String unsupportedTypeString = "INTERVAL_DAY_TIME";
+    private final String json =
+            String.format(
+                    "{\"%s\": \"%s\", \"%s\": %s}",
+                    "type", unsupportedTypeString, "nullable", "true");
+    private final String deserializerExceptionMessageFormat =
+            "Unable to deserialize a logical type of type root '%s'. Please 
check the documentation for supported types.";
+
+    @ParameterizedTest
+    @MethodSource("generateTestData")
+    public void testLogicalTypeJsonSerDe(LogicalType logicalType) throws 
IOException {
+        String json = mapper.writeValueAsString(logicalType);
+        LogicalType actualType = mapper.readValue(json, LogicalType.class);
+
+        assertThat(actualType).isEqualTo(logicalType);
+    }
+
+    @Test
+    public void testSerDeWithUnsupportedType() {
+        // test to serialize unsupported LogicalType
+        assertThatThrownBy(() -> mapper.writeValueAsString(unsupportedType))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                String.format(
+                                        serializerExceptionMessageFormat,
+                                        unsupportedType.asSummaryString())));
+
+        // test to deserialize unsupported JSON string
+        assertThatThrownBy(() -> mapper.readValue(json, LogicalType.class))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                String.format(
+                                        deserializerExceptionMessageFormat,
+                                        unsupportedTypeString)));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Test data
+    // 
--------------------------------------------------------------------------------------------
+
+    private static List<LogicalType> generateTestData() {
+        List<LogicalType> types =
+                Arrays.asList(
+                        new BooleanType(),
+                        new TinyIntType(),
+                        new SmallIntType(),
+                        new IntType(),
+                        new BigIntType(),
+                        new FloatType(),
+                        new DoubleType(),
+                        new DateType(),
+                        CharType.ofEmptyLiteral(),
+                        new CharType(),
+                        new CharType(5),
+                        VarCharType.ofEmptyLiteral(),
+                        new VarCharType(),
+                        new VarCharType(5),
+                        BinaryType.ofEmptyLiteral(),
+                        new BinaryType(),
+                        new BinaryType(100),
+                        VarBinaryType.ofEmptyLiteral(),
+                        new VarBinaryType(),
+                        new VarBinaryType(100),
+                        new DecimalType(10),
+                        new DecimalType(15, 5),
+                        new TimeType(),
+                        new TimeType(3),
+                        new TimestampType(),
+                        new TimestampType(3),
+                        new TimestampType(false, 3),
+                        new ZonedTimestampType(),
+                        new ZonedTimestampType(3),
+                        new LocalZonedTimestampType(),
+                        new LocalZonedTimestampType(3),
+                        new LocalZonedTimestampType(false, 3),
+                        // LocalZonedTimestampType#eaquals doesn't compare 
TimestampKind
+                        new LocalZonedTimestampType(false, 
TimestampKind.PROCTIME, 3),
+                        new MapType(new BigIntType(), new IntType(false)),
+                        new MapType(CharType.ofEmptyLiteral(), 
CharType.ofEmptyLiteral()),
+                        new MapType(VarCharType.ofEmptyLiteral(), 
VarCharType.ofEmptyLiteral()),
+                        new MapType(BinaryType.ofEmptyLiteral(), 
BinaryType.ofEmptyLiteral()),
+                        new MapType(VarBinaryType.ofEmptyLiteral(), 
VarBinaryType.ofEmptyLiteral()),
+                        new MapType(new TimestampType(false, 3), new 
LocalZonedTimestampType()),
+                        new ArrayType(new IntType(false)),
+                        new ArrayType(new TimestampType()),
+                        new ArrayType(new LocalZonedTimestampType(false, 3)),
+                        new ArrayType(CharType.ofEmptyLiteral()),
+                        new ArrayType(VarCharType.ofEmptyLiteral()),
+                        new ArrayType(BinaryType.ofEmptyLiteral()),
+                        new ArrayType(VarBinaryType.ofEmptyLiteral()),
+                        new MultisetType(new IntType(false)),
+                        new MultisetType(new TimestampType()),
+                        new MultisetType(new TimestampType(true, 3)),
+                        new MultisetType(CharType.ofEmptyLiteral()),
+                        new MultisetType(VarCharType.ofEmptyLiteral()),
+                        new MultisetType(BinaryType.ofEmptyLiteral()),
+                        new MultisetType(VarBinaryType.ofEmptyLiteral()),
+                        RowType.of(new BigIntType(), new IntType(false), new 
VarCharType(200)),
+                        RowType.of(
+                                new LogicalType[] {
+                                    new BigIntType(), new IntType(false), new 
VarCharType(200)
+                                },
+                                new String[] {"f1", "f2", "f3"}),
+                        RowType.of(
+                                new TimestampType(false, 3), new 
LocalZonedTimestampType(false, 3)),
+                        RowType.of(
+                                CharType.ofEmptyLiteral(),
+                                VarCharType.ofEmptyLiteral(),
+                                BinaryType.ofEmptyLiteral(),
+                                VarBinaryType.ofEmptyLiteral()),
+                        // Row with descriptions
+                        new RowType(
+                                Arrays.asList(
+                                        new RowType.RowField("ID", new 
BigIntType(), "ID desc"),
+                                        new RowType.RowField(
+                                                "Name", new VarCharType(20), 
"Name desc"))),
+                        // custom RawType
+                        new RawType<>(LocalDateTime.class, 
LocalDateTimeSerializer.INSTANCE),
+                        // external RawType
+                        new RawType<>(
+                                Row.class,
+                                ExternalSerializer.of(
+                                        DataTypes.ROW(DataTypes.INT(), 
DataTypes.STRING()))));
+
+        List<LogicalType> testTypes =
+                Stream.concat(
+                                types.stream().map(type -> type.copy(true)),
+                                types.stream().map(type -> type.copy(false)))
+                        .collect(Collectors.toList());
+
+        // ignore nullable for NullType
+        testTypes.add(new NullType());
+
+        return testTypes;
+    }
+
+    /**
+     * LogicalType isn't annotated with Jackson annotations, so it's necessary 
to register the
+     * customer serializer and deserializer when testing LogicalType Serde 
alone.
+     */
+    private ObjectMapper getObjectMapper() {

Review Comment:
   rename to buildObjectMapper



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to