leonardBang commented on a change in pull request #10468: [FLINK-14649][table
sql / api] Flatten all the connector properties keys to make it easy to
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355162022
##########
File path:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
##########
@@ -177,13 +157,112 @@ public void testTableSource() {
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTableSourceWithLegacyProperties() {
+ // prepare parameters for Kafka table source
+ final TableSchema schema = TableSchema.builder()
+ .field(FRUIT_NAME, DataTypes.STRING())
+ .field(COUNT, DataTypes.DECIMAL(10, 3))
+ .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
+ .field(PROC_TIME, DataTypes.TIMESTAMP(3))
+ .build();
+
+ final List<RowtimeAttributeDescriptor>
rowtimeAttributeDescriptors = Collections.singletonList(
+ new RowtimeAttributeDescriptor(EVENT_TIME, new
ExistingField(TIME), new AscendingTimestamps()));
+
+ final Map<String, String> fieldMapping = new HashMap<>();
+ fieldMapping.put(FRUIT_NAME, NAME);
+ fieldMapping.put(NAME, NAME);
+ fieldMapping.put(COUNT, COUNT);
+ fieldMapping.put(TIME, TIME);
+
+ final Map<KafkaTopicPartition, Long> specificOffsets = new
HashMap<>();
+ specificOffsets.put(new KafkaTopicPartition(TOPIC,
PARTITION_0), OFFSET_0);
+ specificOffsets.put(new KafkaTopicPartition(TOPIC,
PARTITION_1), OFFSET_1);
+
+ final TestDeserializationSchema deserializationSchema = new
TestDeserializationSchema(
+ TableSchema.builder()
+ .field(NAME, DataTypes.STRING())
+ .field(COUNT,
DataTypes.DECIMAL(10, 3))
+ .field(TIME,
DataTypes.TIMESTAMP(3))
+ .build()
+ .toRowType()
+ );
+
+ final KafkaTableSourceBase expected =
getExpectedKafkaTableSource(
+ schema,
+ Optional.of(PROC_TIME),
+ rowtimeAttributeDescriptors,
+ fieldMapping,
+ TOPIC,
+ KAFKA_PROPERTIES,
+ deserializationSchema,
+ StartupMode.SPECIFIC_OFFSETS,
+ specificOffsets);
+
+ TableSourceValidation.validateTableSource(expected);
+
+ // construct table source using descriptors and table source
factory
+ final Map<String, String> legacyPropertiesMap = new HashMap<>();
+ legacyPropertiesMap.putAll(createKafkaSourceProperties());
+
+ // use legacy properties
+ legacyPropertiesMap.remove("connector.specific-offsets");
+
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
+
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
+ legacyPropertiesMap.remove("connector.properties.group.id");
+
+
legacyPropertiesMap.put("connector.specific-offsets.0.partition", "0");
+ legacyPropertiesMap.put("connector.specific-offsets.0.offset",
"100");
+
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
+ legacyPropertiesMap.put("connector.specific-offsets.1.offset",
"123");
+ legacyPropertiesMap.put("connector.properties.0.key",
"zookeeper.connect");
+ legacyPropertiesMap.put("connector.properties.0.value",
"dummy");
+ legacyPropertiesMap.put("connector.properties.1.key",
"bootstrap.servers");
+ legacyPropertiesMap.put("connector.properties.1.value",
"dummy");
+ legacyPropertiesMap.put("connector.properties.2.key",
"group.id");
+ legacyPropertiesMap.put("connector.properties.2.value",
"dummy");
+
+ final TableSource<?> actualSource =
TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap)
+ .createStreamTableSource(legacyPropertiesMap);
+
+ assertEquals(expected, actualSource);
+
+ // test Kafka consumer
+ final KafkaTableSourceBase actualKafkaSource =
(KafkaTableSourceBase) actualSource;
+ final StreamExecutionEnvironmentMock mock = new
StreamExecutionEnvironmentMock();
+ actualKafkaSource.getDataStream(mock);
+
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
+ }
+
+ protected Map<String, String> createKafkaSourceProperties() {
+ return new TestTableDescriptor(
+ new Kafka()
+ .version(getKafkaVersion())
+ .topic(TOPIC)
+ .properties(KAFKA_PROPERTIES)
+ .sinkPartitionerRoundRobin() //
test if accepted although not needed
Review comment:
It's default from flink project code style, I'll try to use one tab.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services