chl-wxp commented on code in PR #10194:
URL: https://github.com/apache/seatunnel/pull/10194#discussion_r2619639811
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -1623,6 +1623,177 @@ public void
testKafkaProtobufForTransformToAssert(TestContainer container)
}
}
+ @TestTemplate
+ public void testProtobufCaseSensitiveFieldNames(TestContainer container)
+ throws IOException, InterruptedException, URISyntaxException {
+ Container.ExecResult execResult =
+
container.executeJob("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ String path =
getTestConfigFile("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf");
+ Config config = ConfigFactory.parseFile(new File(path));
+ Config sinkConfig = config.getConfigList("sink").get(0);
+
+ Map<String, String> schemaProperties = new HashMap<>();
+ schemaProperties.put(
+ "protobuf_message_name",
sinkConfig.getString("protobuf_message_name"));
+ schemaProperties.put("protobuf_schema",
sinkConfig.getString("protobuf_schema"));
+
+ SeaTunnelRowType nestedType =
+ new SeaTunnelRowType(
+ new String[] {"NestedField", "AnotherField"},
+ new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE,
BasicType.INT_TYPE});
+
+ SeaTunnelRowType seaTunnelRowType =
+ new SeaTunnelRowType(
Review Comment:
Reduce redundancy: private SeaTunnelRowType getSeaTunnelRowType()
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -1623,6 +1623,177 @@ public void
testKafkaProtobufForTransformToAssert(TestContainer container)
}
}
+ @TestTemplate
+ public void testProtobufCaseSensitiveFieldNames(TestContainer container)
+ throws IOException, InterruptedException, URISyntaxException {
+ Container.ExecResult execResult =
+
container.executeJob("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ String path =
getTestConfigFile("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf");
+ Config config = ConfigFactory.parseFile(new File(path));
+ Config sinkConfig = config.getConfigList("sink").get(0);
+
+ Map<String, String> schemaProperties = new HashMap<>();
+ schemaProperties.put(
+ "protobuf_message_name",
sinkConfig.getString("protobuf_message_name"));
+ schemaProperties.put("protobuf_schema",
sinkConfig.getString("protobuf_schema"));
+
+ SeaTunnelRowType nestedType =
+ new SeaTunnelRowType(
+ new String[] {"NestedField", "AnotherField"},
+ new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE,
BasicType.INT_TYPE});
+
+ SeaTunnelRowType seaTunnelRowType =
+ new SeaTunnelRowType(
+ new String[] {
+ "MyIntField",
+ "CamelCaseString",
+ "snake_case_field",
+ "NestedObject",
+ "MyMapField"
+ },
+ new SeaTunnelDataType<?>[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ nestedType,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.INT_TYPE)
+ });
+
+ TableSchema schema =
+ TableSchema.builder()
+ .columns(
+ Arrays.asList(
+ IntStream.range(0,
seaTunnelRowType.getTotalFields())
+ .mapToObj(
+ i ->
+
PhysicalColumn.of(
+
seaTunnelRowType
+
.getFieldName(i),
+
seaTunnelRowType
+
.getFieldType(i),
+ 0,
+ true,
+ null,
+ null))
+
.toArray(PhysicalColumn[]::new)))
+ .build();
+
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("", "", "", "test"),
+ schema,
+ schemaProperties,
+ Collections.emptyList(),
+ "It is converted from RowType and only has column
information.");
+
+ ProtobufDeserializationSchema deserializationSchema =
+ new ProtobufDeserializationSchema(catalogTable);
+
+ List<SeaTunnelRow> kafkaSTRow =
+ getKafkaSTRow(
+ "test_protobuf_case_sensitive_topic",
+ value -> {
+ try {
+ return
deserializationSchema.deserialize(value);
+ } catch (IOException e) {
+ throw new RuntimeException("Error
deserializing Kafka message", e);
+ }
+ });
+
+ Assertions.assertEquals(16, kafkaSTRow.size());
+
+ kafkaSTRow.forEach(
+ row -> {
+ Assertions.assertAll(
+ "Verify case-sensitive field values",
+ () -> Assertions.assertNotNull(row.getField(0)),
// MyIntField
+ () -> Assertions.assertNotNull(row.getField(1)),
// CamelCaseString
+ () -> Assertions.assertNotNull(row.getField(2)),
// snake_case_field
+ () -> {
+ SeaTunnelRow nestedRow = (SeaTunnelRow)
row.getField(3);
+ if (nestedRow != null) {
+
Assertions.assertNotNull(nestedRow.getField(0)); // NestedField
+
Assertions.assertNotNull(nestedRow.getField(1)); // AnotherField
+ }
+ },
+ () -> {
+ @SuppressWarnings("unchecked")
+ Map<String, Integer> mapField =
+ (Map<String, Integer>) row.getField(4);
+ if (mapField != null) {
+ Assertions.assertNotNull(mapField);
+ }
+ });
+ });
+ }
+
+ @TestTemplate
+ public void testProtobufCaseSensitiveToAssert(TestContainer container)
+ throws IOException, InterruptedException, URISyntaxException {
+
+ String confFile =
"/protobuf/kafka_protobuf_case_sensitive_to_assert.conf";
+ String path = getTestConfigFile(confFile);
+ Config config = ConfigFactory.parseFile(new File(path));
+ Config sourceConfig = config.getConfigList("source").get(0);
+ ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(sourceConfig);
+
+ SeaTunnelRowType nestedType =
+ new SeaTunnelRowType(
+ new String[] {"NestedField", "AnotherField"},
+ new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE,
BasicType.INT_TYPE});
+
+ SeaTunnelRowType seaTunnelRowType =
+ new SeaTunnelRowType(
+ new String[] {
+ "MyIntField",
+ "CamelCaseString",
+ "snake_case_field",
+ "NestedObject",
+ "MyMapField"
+ },
+ new SeaTunnelDataType<?>[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ nestedType,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.INT_TYPE)
+ });
+
+ DefaultSeaTunnelRowSerializer serializer =
+ getDefaultSeaTunnelRowSerializer(
+ "test_protobuf_case_sensitive_topic",
seaTunnelRowType, readonlyConfig);
+
+ SeaTunnelRow nestedRow = new SeaTunnelRow(2);
+ nestedRow.setField(0, "nested_value");
+ nestedRow.setField(1, 999);
+
+ Map<String, Integer> mapData = new HashMap<>();
+ mapData.put("key1", 100);
+ mapData.put("key2", 200);
+
+ for (int i = 0; i < 16; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(5);
+ row.setField(0, i);
+ row.setField(1, "test_string_" + i);
+ row.setField(2, "snake_value_" + i);
+ row.setField(3, nestedRow);
+ row.setField(4, mapData);
+
+ ProducerRecord<byte[], byte[]> producerRecord =
serializer.serializeRow(row);
+ try {
+ producer.send(producerRecord).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Error sending Kafka message", e);
+ }
+ }
+ producer.flush();
+
+ Container.ExecResult execResult = container.executeJob(confFile);
Review Comment:
Can this test use the
SourceFlowTestUtils.runBatchWithCheckpointDisabled(...) method to complete the
test instead of submitting the task?
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -1623,6 +1623,177 @@ public void
testKafkaProtobufForTransformToAssert(TestContainer container)
}
}
+ @TestTemplate
+ public void testProtobufCaseSensitiveFieldNames(TestContainer container)
+ throws IOException, InterruptedException, URISyntaxException {
+ Container.ExecResult execResult =
+
container.executeJob("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ String path =
getTestConfigFile("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf");
+ Config config = ConfigFactory.parseFile(new File(path));
+ Config sinkConfig = config.getConfigList("sink").get(0);
+
+ Map<String, String> schemaProperties = new HashMap<>();
+ schemaProperties.put(
+ "protobuf_message_name",
sinkConfig.getString("protobuf_message_name"));
+ schemaProperties.put("protobuf_schema",
sinkConfig.getString("protobuf_schema"));
+
+ SeaTunnelRowType nestedType =
+ new SeaTunnelRowType(
+ new String[] {"NestedField", "AnotherField"},
+ new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE,
BasicType.INT_TYPE});
+
+ SeaTunnelRowType seaTunnelRowType =
+ new SeaTunnelRowType(
+ new String[] {
+ "MyIntField",
+ "CamelCaseString",
+ "snake_case_field",
+ "NestedObject",
+ "MyMapField"
+ },
+ new SeaTunnelDataType<?>[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ nestedType,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.INT_TYPE)
+ });
+
+ TableSchema schema =
+ TableSchema.builder()
+ .columns(
+ Arrays.asList(
+ IntStream.range(0,
seaTunnelRowType.getTotalFields())
+ .mapToObj(
+ i ->
+
PhysicalColumn.of(
+
seaTunnelRowType
+
.getFieldName(i),
+
seaTunnelRowType
+
.getFieldType(i),
+ 0,
+ true,
+ null,
+ null))
+
.toArray(PhysicalColumn[]::new)))
+ .build();
+
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("", "", "", "test"),
+ schema,
+ schemaProperties,
+ Collections.emptyList(),
+ "It is converted from RowType and only has column
information.");
+
+ ProtobufDeserializationSchema deserializationSchema =
+ new ProtobufDeserializationSchema(catalogTable);
+
+ List<SeaTunnelRow> kafkaSTRow =
+ getKafkaSTRow(
+ "test_protobuf_case_sensitive_topic",
+ value -> {
+ try {
+ return
deserializationSchema.deserialize(value);
+ } catch (IOException e) {
+ throw new RuntimeException("Error
deserializing Kafka message", e);
+ }
+ });
+
+ Assertions.assertEquals(16, kafkaSTRow.size());
+
+ kafkaSTRow.forEach(
+ row -> {
+ Assertions.assertAll(
+ "Verify case-sensitive field values",
+ () -> Assertions.assertNotNull(row.getField(0)),
// MyIntField
+ () -> Assertions.assertNotNull(row.getField(1)),
// CamelCaseString
+ () -> Assertions.assertNotNull(row.getField(2)),
// snake_case_field
+ () -> {
+ SeaTunnelRow nestedRow = (SeaTunnelRow)
row.getField(3);
+ if (nestedRow != null) {
+
Assertions.assertNotNull(nestedRow.getField(0)); // NestedField
+
Assertions.assertNotNull(nestedRow.getField(1)); // AnotherField
+ }
+ },
+ () -> {
+ @SuppressWarnings("unchecked")
+ Map<String, Integer> mapField =
+ (Map<String, Integer>) row.getField(4);
+ if (mapField != null) {
+ Assertions.assertNotNull(mapField);
+ }
+ });
+ });
+ }
+
+ @TestTemplate
+ public void testProtobufCaseSensitiveToAssert(TestContainer container)
+ throws IOException, InterruptedException, URISyntaxException {
+
+ String confFile =
"/protobuf/kafka_protobuf_case_sensitive_to_assert.conf";
+ String path = getTestConfigFile(confFile);
+ Config config = ConfigFactory.parseFile(new File(path));
+ Config sourceConfig = config.getConfigList("source").get(0);
+ ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(sourceConfig);
+
+ SeaTunnelRowType nestedType =
+ new SeaTunnelRowType(
+ new String[] {"NestedField", "AnotherField"},
+ new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE,
BasicType.INT_TYPE});
+
+ SeaTunnelRowType seaTunnelRowType =
Review Comment:
Reduce redundancy: private SeaTunnelRowType getSeaTunnelRowType()
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -1623,6 +1623,177 @@ public void
testKafkaProtobufForTransformToAssert(TestContainer container)
}
}
+ @TestTemplate
+ public void testProtobufCaseSensitiveFieldNames(TestContainer container)
+ throws IOException, InterruptedException, URISyntaxException {
+ Container.ExecResult execResult =
+
container.executeJob("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf");
Review Comment:
Can this test use the SinkFlowTestUtils.runBatchWithCheckpointDisabled(...)
method to complete the test instead of submitting the task?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]