fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r931717886


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +107,249 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), 
originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, 
OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), 
originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new 
HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), 
toTOperationState(status));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testToTTableSchema(DataTypeSpec spec) {
+        TableSchema actual =
+                new TableSchema(
+                        
toTTableSchema(DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+        List<Integer> javaSqlTypes =
+                Arrays.stream(actual.toTypeDescriptors())
+                        .map(desc -> desc.getType().toJavaSQLType())
+                        .collect(Collectors.toList());
+
+        assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToColumnBasedRowSet(DataTypeSpec spec) throws 
Exception {
+        List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toColumnBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> 
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V10);
+        Iterator<Object[]> iterator = rowSet.iterator();
+        if 
(spec.flinkType.getChildren().equals(Collections.singletonList(BYTES()))) {
+            assertArrayEquals((byte[]) spec.convertedColumnBasedValue, 
(byte[]) iterator.next()[0]);
+        } else {
+            assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+        }
+
+        assertEquals(spec.convertedNullValue, iterator.next()[0]);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToRowBasedRowSet(DataTypeSpec spec) throws 
Exception {
+        List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toRowBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> 
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V3);
+        Iterator<Object[]> iter = rowSet.iterator();
+        assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+        assertEquals(spec.convertedNullValue, iter.next()[0]);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Negative tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, 
RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", 
DataTypes.INT())),
+                                            Collections.singletonList(
+                                                    
GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to 
serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static List<DataTypeSpec> getDataTypeSpecs() {
+        Map<StringData, StringData> map = new HashMap<>();
+        map.put(StringData.fromString("World"), 
StringData.fromString("Hello"));
+        map.put(StringData.fromString("Hello"), 
StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .forType(BOOLEAN())
+                        .forValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .forType(TINYINT())
+                        .forValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .forType(SMALLINT())
+                        .forValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                
DataTypeSpec.newSpec().forType(INT()).forValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .forType(BIGINT())
+                        .forValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .forType(FLOAT())
+                        .forValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .forType(DOUBLE())
+                        .forValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .forType(DECIMAL(9, 6))
+                        .forValue(DecimalData.fromBigDecimal(new 
BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .forType(STRING())
+                        .forValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .forType(BYTES())
+                        .forValue("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue(
+                                new String("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8)),
+                                "Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8)),
+                DataTypeSpec.newSpec()
+                        .forType(DATE())
+                        .forValue((int) 
LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .forType(TIMESTAMP(4))
+                        .forValue(
+                                TimestampData.fromLocalDateTime(
+                                        
LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .forType(DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))
+                        .forValue(mapData)
+                        .expectSqlType(Types.JAVA_OBJECT)
+                        
.expectValue("{\"Hello\":\"World\",\"World\":\"Hello\"}")
+                        .expectNullValue("null"),
+                DataTypeSpec.newSpec()
+                        .forType(
+                                DataTypes.ARRAY(
+                                        DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING())))
+                        .forValue(new GenericArrayData(new Object[] {mapData, 
mapData}))
+                        // Hive uses STRING type
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue(
+                                
"[{\"Hello\":\"World\",\"World\":\"Hello\"},{\"Hello\":\"World\",\"World\":\"Hello\"}]")
+                        .expectNullValue("null"));

Review Comment:
   Every DataType test contains two parts:
   
   1. Serialization can get the expected value for the specific value.
   2. Serialization can serialize the null value.
   
   Hive serializes the null value for the complex type to the string "null"  
rather than null[1].
   
   [1] 
https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java#L317



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +107,249 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), 
originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, 
OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), 
originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new 
HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), 
toTOperationState(status));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testToTTableSchema(DataTypeSpec spec) {
+        TableSchema actual =
+                new TableSchema(
+                        
toTTableSchema(DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+        List<Integer> javaSqlTypes =
+                Arrays.stream(actual.toTypeDescriptors())
+                        .map(desc -> desc.getType().toJavaSQLType())
+                        .collect(Collectors.toList());
+
+        assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToColumnBasedRowSet(DataTypeSpec spec) throws 
Exception {
+        List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toColumnBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> 
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V10);
+        Iterator<Object[]> iterator = rowSet.iterator();
+        if 
(spec.flinkType.getChildren().equals(Collections.singletonList(BYTES()))) {
+            assertArrayEquals((byte[]) spec.convertedColumnBasedValue, 
(byte[]) iterator.next()[0]);
+        } else {
+            assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+        }
+
+        assertEquals(spec.convertedNullValue, iterator.next()[0]);
+    }
+
+    @ParameterizedTest
+    @MethodSource("getDataTypeSpecs")
+    public void testResultSetToRowBasedRowSet(DataTypeSpec spec) throws 
Exception {
+        List<LogicalType> fieldTypes = 
spec.flinkType.getLogicalType().getChildren();
+        TRowSet tRowSet =
+                toRowBasedSet(
+                        fieldTypes,
+                        IntStream.range(0, fieldTypes.size())
+                                .mapToObj(
+                                        pos -> 
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+                                .collect(Collectors.toList()),
+                        Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+        RowSet rowSet = RowSetFactory.create(tRowSet, 
HIVE_CLI_SERVICE_PROTOCOL_V3);
+        Iterator<Object[]> iter = rowSet.iterator();
+        assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+        assertEquals(spec.convertedNullValue, iter.next()[0]);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Negative tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testSerializeRowDataWithRowKind() {
+        for (RowKind kind :
+                Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, 
RowKind.DELETE)) {
+            assertThatThrownBy(
+                            () ->
+                                    toTRowSet(
+                                            HIVE_CLI_SERVICE_PROTOCOL_V5,
+                                            ResolvedSchema.of(
+                                                    Column.physical("f0", 
DataTypes.INT())),
+                                            Collections.singletonList(
+                                                    
GenericRowData.ofKind(kind, 1))))
+                    .satisfies(
+                            anyCauseMatches(
+                                    UnsupportedOperationException.class,
+                                    "HiveServer2 Endpoint only supports to 
serialize the INSERT-ONLY RowData."));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static List<DataTypeSpec> getDataTypeSpecs() {
+        Map<StringData, StringData> map = new HashMap<>();
+        map.put(StringData.fromString("World"), 
StringData.fromString("Hello"));
+        map.put(StringData.fromString("Hello"), 
StringData.fromString("World"));
+        MapData mapData = new GenericMapData(map);
+        return Arrays.asList(
+                DataTypeSpec.newSpec()
+                        .forType(BOOLEAN())
+                        .forValue(Boolean.TRUE)
+                        .expectSqlType(Types.BOOLEAN),
+                DataTypeSpec.newSpec()
+                        .forType(TINYINT())
+                        .forValue((byte) 3)
+                        // TINYINT is the alias of the BYTE in Hive.
+                        .expectSqlType(Types.BINARY),
+                DataTypeSpec.newSpec()
+                        .forType(SMALLINT())
+                        .forValue((short) 255)
+                        .expectSqlType(Types.SMALLINT),
+                
DataTypeSpec.newSpec().forType(INT()).forValue(1994).expectSqlType(Types.INTEGER),
+                DataTypeSpec.newSpec()
+                        .forType(BIGINT())
+                        .forValue(13214991L)
+                        .expectSqlType(Types.BIGINT),
+                DataTypeSpec.newSpec()
+                        .forType(FLOAT())
+                        .forValue(1024.0f)
+                        .expectSqlType(Types.FLOAT)
+                        .expectValue(1024.0),
+                DataTypeSpec.newSpec()
+                        .forType(DOUBLE())
+                        .forValue(2048.1024)
+                        .expectSqlType(Types.DOUBLE),
+                DataTypeSpec.newSpec()
+                        .forType(DECIMAL(9, 6))
+                        .forValue(DecimalData.fromBigDecimal(new 
BigDecimal("123.456789"), 9, 6))
+                        .expectSqlType(Types.DECIMAL)
+                        .expectValue("123.456789"),
+                DataTypeSpec.newSpec()
+                        .forType(STRING())
+                        .forValue(StringData.fromString("Hello World"))
+                        .expectSqlType(Types.VARCHAR)
+                        .expectValue("Hello World"),
+                DataTypeSpec.newSpec()
+                        .forType(BYTES())
+                        .forValue("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8))
+                        .expectSqlType(Types.BINARY)
+                        .expectValue(
+                                new String("Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8)),
+                                "Flink SQL 
Gateway".getBytes(StandardCharsets.UTF_8)),
+                DataTypeSpec.newSpec()
+                        .forType(DATE())
+                        .forValue((int) 
LocalDate.parse("2022-02-22").toEpochDay())
+                        .expectSqlType(Types.DATE)
+                        .expectValue("2022-02-22"),
+                DataTypeSpec.newSpec()
+                        .forType(TIMESTAMP(4))
+                        .forValue(
+                                TimestampData.fromLocalDateTime(
+                                        
LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+                        .expectSqlType(Types.TIMESTAMP)
+                        .expectValue("2020-05-11 12:00:12.1234"),
+                DataTypeSpec.newSpec()
+                        .forType(DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))

Review Comment:
   I open a ticket in the 
[FLINK-28723](https://issues.apache.org/jira/browse/FLINK-28723).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to