fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r931717886
##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +107,249 @@ public void testConvertSessionHandle() {
SessionHandle originSessionHandle = SessionHandle.create();
assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)),
originSessionHandle);
}
+
+ @Test
+ public void testConvertSessionHandleAndOperationHandle() {
+ SessionHandle originSessionHandle = SessionHandle.create();
+ OperationHandle originOperationHandle = OperationHandle.create();
+ TOperationHandle tOperationHandle =
+ toTOperationHandle(
+ originSessionHandle, originOperationHandle,
OperationType.UNKNOWN, false);
+
+ assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+ assertEquals(toOperationHandle(tOperationHandle),
originOperationHandle);
+ }
+
+ @Test
+ public void testConvertOperationStatus() {
+ Map<OperationStatus, TOperationState> expectedMappings = new
HashMap<>();
+ expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+ expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+ expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+ expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+ expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+ expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+ expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+ expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+ for (OperationStatus status : expectedMappings.keySet()) {
+ assertEquals(expectedMappings.get(status),
toTOperationState(status));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("getDataTypeSpecs")
+ public void testToTTableSchema(DataTypeSpec spec) {
+ TableSchema actual =
+ new TableSchema(
+
toTTableSchema(DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+ List<Integer> javaSqlTypes =
+ Arrays.stream(actual.toTypeDescriptors())
+ .map(desc -> desc.getType().toJavaSQLType())
+ .collect(Collectors.toList());
+
+ assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+ }
+
+ @ParameterizedTest
+ @MethodSource("getDataTypeSpecs")
+ public void testResultSetToColumnBasedRowSet(DataTypeSpec spec) throws
Exception {
+ List<LogicalType> fieldTypes =
spec.flinkType.getLogicalType().getChildren();
+ TRowSet tRowSet =
+ toColumnBasedSet(
+ fieldTypes,
+ IntStream.range(0, fieldTypes.size())
+ .mapToObj(
+ pos ->
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+ .collect(Collectors.toList()),
+ Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+ RowSet rowSet = RowSetFactory.create(tRowSet,
HIVE_CLI_SERVICE_PROTOCOL_V10);
+ Iterator<Object[]> iterator = rowSet.iterator();
+ if
(spec.flinkType.getChildren().equals(Collections.singletonList(BYTES()))) {
+ assertArrayEquals((byte[]) spec.convertedColumnBasedValue,
(byte[]) iterator.next()[0]);
+ } else {
+ assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+ }
+
+ assertEquals(spec.convertedNullValue, iterator.next()[0]);
+ }
+
+ @ParameterizedTest
+ @MethodSource("getDataTypeSpecs")
+ public void testResultSetToRowBasedRowSet(DataTypeSpec spec) throws
Exception {
+ List<LogicalType> fieldTypes =
spec.flinkType.getLogicalType().getChildren();
+ TRowSet tRowSet =
+ toRowBasedSet(
+ fieldTypes,
+ IntStream.range(0, fieldTypes.size())
+ .mapToObj(
+ pos ->
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+ .collect(Collectors.toList()),
+ Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+ RowSet rowSet = RowSetFactory.create(tRowSet,
HIVE_CLI_SERVICE_PROTOCOL_V3);
+ Iterator<Object[]> iter = rowSet.iterator();
+ assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+ assertEquals(spec.convertedNullValue, iter.next()[0]);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Negative tests
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ public void testSerializeRowDataWithRowKind() {
+ for (RowKind kind :
+ Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER,
RowKind.DELETE)) {
+ assertThatThrownBy(
+ () ->
+ toTRowSet(
+ HIVE_CLI_SERVICE_PROTOCOL_V5,
+ ResolvedSchema.of(
+ Column.physical("f0",
DataTypes.INT())),
+ Collections.singletonList(
+
GenericRowData.ofKind(kind, 1))))
+ .satisfies(
+ anyCauseMatches(
+ UnsupportedOperationException.class,
+ "HiveServer2 Endpoint only supports to
serialize the INSERT-ONLY RowData."));
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private static List<DataTypeSpec> getDataTypeSpecs() {
+ Map<StringData, StringData> map = new HashMap<>();
+ map.put(StringData.fromString("World"),
StringData.fromString("Hello"));
+ map.put(StringData.fromString("Hello"),
StringData.fromString("World"));
+ MapData mapData = new GenericMapData(map);
+ return Arrays.asList(
+ DataTypeSpec.newSpec()
+ .forType(BOOLEAN())
+ .forValue(Boolean.TRUE)
+ .expectSqlType(Types.BOOLEAN),
+ DataTypeSpec.newSpec()
+ .forType(TINYINT())
+ .forValue((byte) 3)
+ // TINYINT is the alias of the BYTE in Hive.
+ .expectSqlType(Types.BINARY),
+ DataTypeSpec.newSpec()
+ .forType(SMALLINT())
+ .forValue((short) 255)
+ .expectSqlType(Types.SMALLINT),
+
DataTypeSpec.newSpec().forType(INT()).forValue(1994).expectSqlType(Types.INTEGER),
+ DataTypeSpec.newSpec()
+ .forType(BIGINT())
+ .forValue(13214991L)
+ .expectSqlType(Types.BIGINT),
+ DataTypeSpec.newSpec()
+ .forType(FLOAT())
+ .forValue(1024.0f)
+ .expectSqlType(Types.FLOAT)
+ .expectValue(1024.0),
+ DataTypeSpec.newSpec()
+ .forType(DOUBLE())
+ .forValue(2048.1024)
+ .expectSqlType(Types.DOUBLE),
+ DataTypeSpec.newSpec()
+ .forType(DECIMAL(9, 6))
+ .forValue(DecimalData.fromBigDecimal(new
BigDecimal("123.456789"), 9, 6))
+ .expectSqlType(Types.DECIMAL)
+ .expectValue("123.456789"),
+ DataTypeSpec.newSpec()
+ .forType(STRING())
+ .forValue(StringData.fromString("Hello World"))
+ .expectSqlType(Types.VARCHAR)
+ .expectValue("Hello World"),
+ DataTypeSpec.newSpec()
+ .forType(BYTES())
+ .forValue("Flink SQL
Gateway".getBytes(StandardCharsets.UTF_8))
+ .expectSqlType(Types.BINARY)
+ .expectValue(
+ new String("Flink SQL
Gateway".getBytes(StandardCharsets.UTF_8)),
+ "Flink SQL
Gateway".getBytes(StandardCharsets.UTF_8)),
+ DataTypeSpec.newSpec()
+ .forType(DATE())
+ .forValue((int)
LocalDate.parse("2022-02-22").toEpochDay())
+ .expectSqlType(Types.DATE)
+ .expectValue("2022-02-22"),
+ DataTypeSpec.newSpec()
+ .forType(TIMESTAMP(4))
+ .forValue(
+ TimestampData.fromLocalDateTime(
+
LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+ .expectSqlType(Types.TIMESTAMP)
+ .expectValue("2020-05-11 12:00:12.1234"),
+ DataTypeSpec.newSpec()
+ .forType(DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()))
+ .forValue(mapData)
+ .expectSqlType(Types.JAVA_OBJECT)
+
.expectValue("{\"Hello\":\"World\",\"World\":\"Hello\"}")
+ .expectNullValue("null"),
+ DataTypeSpec.newSpec()
+ .forType(
+ DataTypes.ARRAY(
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING())))
+ .forValue(new GenericArrayData(new Object[] {mapData,
mapData}))
+ // Hive uses STRING type
+ .expectSqlType(Types.VARCHAR)
+ .expectValue(
+
"[{\"Hello\":\"World\",\"World\":\"Hello\"},{\"Hello\":\"World\",\"World\":\"Hello\"}]")
+ .expectNullValue("null"));
Review Comment:
Every DataType test contains two parts:
1. Serialization can get the expected value for the specific value.
2. Serialization can serialize the null value.
Hive serializes the null value for the complex type to the string "null"
rather than null[1].
[1]
https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java#L317
##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +107,249 @@ public void testConvertSessionHandle() {
SessionHandle originSessionHandle = SessionHandle.create();
assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)),
originSessionHandle);
}
+
+ @Test
+ public void testConvertSessionHandleAndOperationHandle() {
+ SessionHandle originSessionHandle = SessionHandle.create();
+ OperationHandle originOperationHandle = OperationHandle.create();
+ TOperationHandle tOperationHandle =
+ toTOperationHandle(
+ originSessionHandle, originOperationHandle,
OperationType.UNKNOWN, false);
+
+ assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+ assertEquals(toOperationHandle(tOperationHandle),
originOperationHandle);
+ }
+
+ @Test
+ public void testConvertOperationStatus() {
+ Map<OperationStatus, TOperationState> expectedMappings = new
HashMap<>();
+ expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+ expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+ expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+ expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+ expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+ expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+ expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+ expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+ for (OperationStatus status : expectedMappings.keySet()) {
+ assertEquals(expectedMappings.get(status),
toTOperationState(status));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("getDataTypeSpecs")
+ public void testToTTableSchema(DataTypeSpec spec) {
+ TableSchema actual =
+ new TableSchema(
+
toTTableSchema(DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType)));
+ List<Integer> javaSqlTypes =
+ Arrays.stream(actual.toTypeDescriptors())
+ .map(desc -> desc.getType().toJavaSQLType())
+ .collect(Collectors.toList());
+
+ assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes);
+ }
+
+ @ParameterizedTest
+ @MethodSource("getDataTypeSpecs")
+ public void testResultSetToColumnBasedRowSet(DataTypeSpec spec) throws
Exception {
+ List<LogicalType> fieldTypes =
spec.flinkType.getLogicalType().getChildren();
+ TRowSet tRowSet =
+ toColumnBasedSet(
+ fieldTypes,
+ IntStream.range(0, fieldTypes.size())
+ .mapToObj(
+ pos ->
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+ .collect(Collectors.toList()),
+ Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+ RowSet rowSet = RowSetFactory.create(tRowSet,
HIVE_CLI_SERVICE_PROTOCOL_V10);
+ Iterator<Object[]> iterator = rowSet.iterator();
+ if
(spec.flinkType.getChildren().equals(Collections.singletonList(BYTES()))) {
+ assertArrayEquals((byte[]) spec.convertedColumnBasedValue,
(byte[]) iterator.next()[0]);
+ } else {
+ assertEquals(spec.convertedColumnBasedValue, iterator.next()[0]);
+ }
+
+ assertEquals(spec.convertedNullValue, iterator.next()[0]);
+ }
+
+ @ParameterizedTest
+ @MethodSource("getDataTypeSpecs")
+ public void testResultSetToRowBasedRowSet(DataTypeSpec spec) throws
Exception {
+ List<LogicalType> fieldTypes =
spec.flinkType.getLogicalType().getChildren();
+ TRowSet tRowSet =
+ toRowBasedSet(
+ fieldTypes,
+ IntStream.range(0, fieldTypes.size())
+ .mapToObj(
+ pos ->
RowData.createFieldGetter(fieldTypes.get(pos), pos))
+ .collect(Collectors.toList()),
+ Arrays.asList(spec.flinkValue, new GenericRowData(1)));
+ RowSet rowSet = RowSetFactory.create(tRowSet,
HIVE_CLI_SERVICE_PROTOCOL_V3);
+ Iterator<Object[]> iter = rowSet.iterator();
+ assertEquals(spec.convertedRowBasedValue, iter.next()[0]);
+ assertEquals(spec.convertedNullValue, iter.next()[0]);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Negative tests
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ public void testSerializeRowDataWithRowKind() {
+ for (RowKind kind :
+ Arrays.asList(RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER,
RowKind.DELETE)) {
+ assertThatThrownBy(
+ () ->
+ toTRowSet(
+ HIVE_CLI_SERVICE_PROTOCOL_V5,
+ ResolvedSchema.of(
+ Column.physical("f0",
DataTypes.INT())),
+ Collections.singletonList(
+
GenericRowData.ofKind(kind, 1))))
+ .satisfies(
+ anyCauseMatches(
+ UnsupportedOperationException.class,
+ "HiveServer2 Endpoint only supports to
serialize the INSERT-ONLY RowData."));
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private static List<DataTypeSpec> getDataTypeSpecs() {
+ Map<StringData, StringData> map = new HashMap<>();
+ map.put(StringData.fromString("World"),
StringData.fromString("Hello"));
+ map.put(StringData.fromString("Hello"),
StringData.fromString("World"));
+ MapData mapData = new GenericMapData(map);
+ return Arrays.asList(
+ DataTypeSpec.newSpec()
+ .forType(BOOLEAN())
+ .forValue(Boolean.TRUE)
+ .expectSqlType(Types.BOOLEAN),
+ DataTypeSpec.newSpec()
+ .forType(TINYINT())
+ .forValue((byte) 3)
+ // TINYINT is the alias of the BYTE in Hive.
+ .expectSqlType(Types.BINARY),
+ DataTypeSpec.newSpec()
+ .forType(SMALLINT())
+ .forValue((short) 255)
+ .expectSqlType(Types.SMALLINT),
+
DataTypeSpec.newSpec().forType(INT()).forValue(1994).expectSqlType(Types.INTEGER),
+ DataTypeSpec.newSpec()
+ .forType(BIGINT())
+ .forValue(13214991L)
+ .expectSqlType(Types.BIGINT),
+ DataTypeSpec.newSpec()
+ .forType(FLOAT())
+ .forValue(1024.0f)
+ .expectSqlType(Types.FLOAT)
+ .expectValue(1024.0),
+ DataTypeSpec.newSpec()
+ .forType(DOUBLE())
+ .forValue(2048.1024)
+ .expectSqlType(Types.DOUBLE),
+ DataTypeSpec.newSpec()
+ .forType(DECIMAL(9, 6))
+ .forValue(DecimalData.fromBigDecimal(new
BigDecimal("123.456789"), 9, 6))
+ .expectSqlType(Types.DECIMAL)
+ .expectValue("123.456789"),
+ DataTypeSpec.newSpec()
+ .forType(STRING())
+ .forValue(StringData.fromString("Hello World"))
+ .expectSqlType(Types.VARCHAR)
+ .expectValue("Hello World"),
+ DataTypeSpec.newSpec()
+ .forType(BYTES())
+ .forValue("Flink SQL
Gateway".getBytes(StandardCharsets.UTF_8))
+ .expectSqlType(Types.BINARY)
+ .expectValue(
+ new String("Flink SQL
Gateway".getBytes(StandardCharsets.UTF_8)),
+ "Flink SQL
Gateway".getBytes(StandardCharsets.UTF_8)),
+ DataTypeSpec.newSpec()
+ .forType(DATE())
+ .forValue((int)
LocalDate.parse("2022-02-22").toEpochDay())
+ .expectSqlType(Types.DATE)
+ .expectValue("2022-02-22"),
+ DataTypeSpec.newSpec()
+ .forType(TIMESTAMP(4))
+ .forValue(
+ TimestampData.fromLocalDateTime(
+
LocalDateTime.parse("2020-05-11T12:00:12.1234")))
+ .expectSqlType(Types.TIMESTAMP)
+ .expectValue("2020-05-11 12:00:12.1234"),
+ DataTypeSpec.newSpec()
+ .forType(DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()))
Review Comment:
I open a ticket in the
[FLINK-28723](https://issues.apache.org/jira/browse/FLINK-28723).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]