wuchong commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r929707617


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +110,250 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), 
originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, 
OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), 
originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new 
HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), 
toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            TableSchema actual =
+                    new TableSchema(
+                            toTTableSchema(
+                                    
DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+            List<Integer> javaSqlTypes =
+                    Arrays.stream(actual.toTypeDescriptors())
+                            .map(desc -> desc.getType().toJavaSQLType())
+                            .collect(Collectors.toList());
+
+            assertEquals(Collections.singletonList(spec.sqlType), 
javaSqlTypes);
+        }
+    }
+
+    @Test
+    public void testResultSetToColumnBasedRowSet() throws Exception {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {

Review Comment:
   Use JUnit5 `@ParameterizedTest` and `@ValueSource` to write parameterized 
test methods.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +110,250 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), 
originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, 
OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), 
originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new 
HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), 
toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            TableSchema actual =
+                    new TableSchema(
+                            toTTableSchema(
+                                    
DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+            List<Integer> javaSqlTypes =
+                    Arrays.stream(actual.toTypeDescriptors())
+                            .map(desc -> desc.getType().toJavaSQLType())
+                            .collect(Collectors.toList());
+
+            assertEquals(Collections.singletonList(spec.sqlType), 
javaSqlTypes);
+        }
+    }
+
+    @Test
+    public void testResultSetToColumnBasedRowSet() throws Exception {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+            TRowSet tRowSet =
+                    toColumnBasedSet(
+                            fieldTypes,
+                            IntStream.range(0, fieldTypes.size())
+                                    .mapToObj(
+                                            pos ->
+                                                    RowData.createFieldGetter(
+                                                            
fieldTypes.get(pos), pos))
+                                    .collect(Collectors.toList()),
+                            new RowDataToStringConverterImpl(
+                                    spec.flinkType,
+                                    DateTimeUtils.UTC_ZONE.toZoneId(),
+                                    
Thread.currentThread().getContextClassLoader(),
+                                    false),
+                            Arrays.asList(spec.flinkValue, new 
GenericRowData(1)));
+            RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V10);
+            Iterator<Object[]> iterator = rowSet.iterator();
+            if 
(spec.flinkType.getChildren().equals(Collections.singletonList(DataTypes.BYTES())))
 {
+                assertArrayEquals(
+                        (byte[]) spec.convertedColumnBasedValue, (byte[]) 
iterator.next()[0]);
+            } else {
+                assertEquals(spec.convertedColumnBasedValue, 
iterator.next()[0]);
+            }
+
+            assertNull(iterator.next()[0]);
+        }
+    }
+
+    @Test
+    public void testResultSetToRowBasedRowSet() throws Exception {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+            TRowSet tRowSet =
+                    toRowBasedSet(
+                            fieldTypes,
+                            IntStream.range(0, fieldTypes.size())
+                                    .mapToObj(
+                                            pos ->
+                                                    RowData.createFieldGetter(
+                                                            
fieldTypes.get(pos), pos))
+                                    .collect(Collectors.toList()),
+                            new RowDataToStringConverterImpl(
+                                    spec.flinkType,
+                                    DateTimeUtils.UTC_ZONE.toZoneId(),
+                                    
Thread.currentThread().getContextClassLoader(),
+                                    false),
+                            Arrays.asList(spec.flinkValue, new 
GenericRowData(1)));
+            RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V3);
+            Iterator<Object[]> iter = rowSet.iterator();
+            assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+            assertNull(iter.next()[0]);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Negative tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, 
RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", 
DataTypes.INT())),
+                                            
SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
+                                            Collections.singletonList(
+                                                    
GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to 
serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private List<DataTypeSpec> getDataTypeSpecs() {
+        Map<Integer, StringData> map = new HashMap<>();
+        map.put(1, StringData.fromString("Hello"));
+        map.put(Integer.MAX_VALUE, StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .withType(BOOLEAN())
+                        .withValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .withType(TINYINT())
+                        .withValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .withType(SMALLINT())
+                        .withValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                
DataTypeSpec.newSpec().withType(INT()).withValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .withType(BIGINT())
+                        .withValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .withType(FLOAT())
+                        .withValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .withType(DOUBLE())
+                        .withValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .withType(DECIMAL(9, 6))
+                        .withValue(DecimalData.fromBigDecimal(new 
BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .withType(STRING())
+                        .withValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .withType(BYTES())
+                        .withValue("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectRowBasedValue("x'" + EncodingUtils.hex("Flink 
SQL Gateway") + "'"),
+                DataTypeSpec.newSpec()
+                        .withType(DATE())
+                        .withValue((int) 
LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .withType(TIMESTAMP(4))
+                        .withValue(
+                                TimestampData.fromLocalDateTime(
+                                        
LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .withType(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()))
+                        .withValue(mapData)
+                        .expectSqlType(Types.JAVA_OBJECT)
+                        .expectValue("{2147483647=World, 1=Hello}"),
+                DataTypeSpec.newSpec()
+                        .withType(
+                                DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())))
+                        .withValue(new GenericArrayData(new Object[] {mapData, 
mapData}))
+                        // Hive uses STRING type
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("[{2147483647=World, 1=Hello}, 
{2147483647=World, 1=Hello}]"));
+    }
+
+    private static class DataTypeSpec {

Review Comment:
   Add `toString` for this class for better error method when running a spec is 
failed. 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java:
##########
@@ -48,7 +48,7 @@ public class ResultStore {
     private final List<RowData> recordsBuffer = new ArrayList<>();
     private final int maxBufferSize;
 
-    private final Object resultLock = new Object();
+    private final Object resultLock = new ReentrantLock();

Review Comment:
   Why use `ReentrantLock`?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java:
##########
@@ -128,6 +140,8 @@ private void processRecord(RowData row) {
                 }
             }
             recordsBuffer.add(row);
+            // Notify the consumer to consume
+            resultLock.notify();

Review Comment:
   Should be `notifyAll()`?



##########
flink-connectors/flink-connector-hive/src/test/resources/endpoint/hive_catalog.q:
##########
@@ -0,0 +1,121 @@
+# catalog_database.q - CREATE/DROP/SHOW/USE CATALOG/DATABASE
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ==========================================================================
+# test hive catalog
+# ==========================================================================
+
+show current catalog;
+!output
++----------------------+
+| current catalog name |
++----------------------+
+|                 hive |
++----------------------+
+1 row in set
+!ok
+
+show databases;
+!output
++---------------+
+| database name |
++---------------+
+|       default |
++---------------+
+1 row in set
+!ok
+
+show tables;
+!output
+Empty set
+!ok
+
+create database additional_test_database;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+use additional_test_database;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+create table param_types_table (
+    dec DECIMAL(10, 10),
+    ch CHAR(5),
+    vch VARCHAR(15)
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+show tables;
+!output
++-------------------+
+|        table name |
++-------------------+
+| param_types_table |
++-------------------+
+1 row in set
+!ok
+
+show current database;
+!output
++--------------------------+
+|    current database name |
++--------------------------+
+| additional_test_database |
++--------------------------+
+1 row in set
+!ok
+
+# ==========================================================================
+# test hive table with parameterized types
+# ==========================================================================
+
+describe hive.additional_test_database.param_types_table;
+!output
++------+-----------------+------+-----+--------+-----------+
+| name |            type | null | key | extras | watermark |

Review Comment:
   It seems the `describe` command doesn't not compatible with HiveSQL? 



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +110,250 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), 
originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, 
OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), 
originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new 
HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), 
toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {

Review Comment:
   Use JUnit5 `@ParameterizedTest` and `@ValueSource` to write parameterized 
test methods.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +110,250 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), 
originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, 
OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), 
originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new 
HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), 
toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            TableSchema actual =
+                    new TableSchema(
+                            toTTableSchema(
+                                    
DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+            List<Integer> javaSqlTypes =
+                    Arrays.stream(actual.toTypeDescriptors())
+                            .map(desc -> desc.getType().toJavaSQLType())
+                            .collect(Collectors.toList());
+
+            assertEquals(Collections.singletonList(spec.sqlType), 
javaSqlTypes);
+        }
+    }
+
+    @Test
+    public void testResultSetToColumnBasedRowSet() throws Exception {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+            TRowSet tRowSet =
+                    toColumnBasedSet(
+                            fieldTypes,
+                            IntStream.range(0, fieldTypes.size())
+                                    .mapToObj(
+                                            pos ->
+                                                    RowData.createFieldGetter(
+                                                            
fieldTypes.get(pos), pos))
+                                    .collect(Collectors.toList()),
+                            new RowDataToStringConverterImpl(
+                                    spec.flinkType,
+                                    DateTimeUtils.UTC_ZONE.toZoneId(),
+                                    
Thread.currentThread().getContextClassLoader(),
+                                    false),
+                            Arrays.asList(spec.flinkValue, new 
GenericRowData(1)));
+            RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V10);
+            Iterator<Object[]> iterator = rowSet.iterator();
+            if 
(spec.flinkType.getChildren().equals(Collections.singletonList(DataTypes.BYTES())))
 {
+                assertArrayEquals(
+                        (byte[]) spec.convertedColumnBasedValue, (byte[]) 
iterator.next()[0]);
+            } else {
+                assertEquals(spec.convertedColumnBasedValue, 
iterator.next()[0]);
+            }
+
+            assertNull(iterator.next()[0]);
+        }
+    }
+
+    @Test
+    public void testResultSetToRowBasedRowSet() throws Exception {
+        for (DataTypeSpec spec : getDataTypeSpecs()) {
+            List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+            TRowSet tRowSet =
+                    toRowBasedSet(
+                            fieldTypes,
+                            IntStream.range(0, fieldTypes.size())
+                                    .mapToObj(
+                                            pos ->
+                                                    RowData.createFieldGetter(
+                                                            
fieldTypes.get(pos), pos))
+                                    .collect(Collectors.toList()),
+                            new RowDataToStringConverterImpl(
+                                    spec.flinkType,
+                                    DateTimeUtils.UTC_ZONE.toZoneId(),
+                                    
Thread.currentThread().getContextClassLoader(),
+                                    false),
+                            Arrays.asList(spec.flinkValue, new 
GenericRowData(1)));
+            RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V3);
+            Iterator<Object[]> iter = rowSet.iterator();
+            assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+            assertNull(iter.next()[0]);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Negative tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, 
RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", 
DataTypes.INT())),
+                                            
SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
+                                            Collections.singletonList(
+                                                    
GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to 
serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private List<DataTypeSpec> getDataTypeSpecs() {
+        Map<Integer, StringData> map = new HashMap<>();
+        map.put(1, StringData.fromString("Hello"));
+        map.put(Integer.MAX_VALUE, StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .withType(BOOLEAN())
+                        .withValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .withType(TINYINT())
+                        .withValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .withType(SMALLINT())
+                        .withValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                
DataTypeSpec.newSpec().withType(INT()).withValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .withType(BIGINT())
+                        .withValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .withType(FLOAT())
+                        .withValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .withType(DOUBLE())
+                        .withValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .withType(DECIMAL(9, 6))
+                        .withValue(DecimalData.fromBigDecimal(new 
BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .withType(STRING())
+                        .withValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .withType(BYTES())
+                        .withValue("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectRowBasedValue("x'" + EncodingUtils.hex("Flink 
SQL Gateway") + "'"),
+                DataTypeSpec.newSpec()
+                        .withType(DATE())
+                        .withValue((int) 
LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .withType(TIMESTAMP(4))
+                        .withValue(
+                                TimestampData.fromLocalDateTime(
+                                        
LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .withType(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()))
+                        .withValue(mapData)
+                        .expectSqlType(Types.JAVA_OBJECT)
+                        .expectValue("{2147483647=World, 1=Hello}"),
+                DataTypeSpec.newSpec()
+                        .withType(
+                                DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())))
+                        .withValue(new GenericArrayData(new Object[] {mapData, 
mapData}))
+                        // Hive uses STRING type
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("[{2147483647=World, 1=Hello}, 
{2147483647=World, 1=Hello}]"));
+    }
+
+    private static class DataTypeSpec {
+        DataType flinkType;
+        Integer sqlType;
+        RowData flinkValue;
+        Object convertedColumnBasedValue;
+        Object convertedRowBasedValue;
+
+        public static DataTypeSpec newSpec() {
+            DataTypeSpec spec = new DataTypeSpec();
+            spec.flinkValue = new GenericRowData(1);
+            return spec;
+        }
+
+        public DataTypeSpec withType(DataType flinkType) {
+            this.flinkType = DataTypes.ROW(flinkType);
+            return this;
+        }
+
+        public DataTypeSpec expectSqlType(int sqlType) {
+            this.sqlType = sqlType;
+            return this;
+        }
+
+        public DataTypeSpec withValue(Object flinkValue) {
+            this.flinkValue = GenericRowData.of(flinkValue);
+            this.convertedColumnBasedValue = flinkValue;
+            this.convertedRowBasedValue = flinkValue;
+            return this;
+        }
+
+        public DataTypeSpec expectValue(Object convertedValue) {
+            this.convertedColumnBasedValue = convertedValue;
+            this.convertedRowBasedValue = convertedValue;
+            return this;
+        }
+
+        public DataTypeSpec expectRowBasedValue(Object convertedRowBasedValue) 
{
+            this.convertedRowBasedValue = convertedRowBasedValue;
+            return this;
+        }

Review Comment:
   The semantics of the three methods are confusing to me. The invoking order 
is critical, otherwise, the result is unexpected. How about separating them 
into 3 methods: 
   
   ```java
   withValue -> forValue() // withType -> forType as well. 
   expectValue(Object rowAndColumnBasedValue)
   expectValue(Object rowBasedValue, Object columnBasedValue)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to