leonardBang commented on a change in pull request #10468: [FLINK-14649][table 
sql / api] Flatten all the connector properties keys to make it easy to 
configure in DDL
URL: https://github.com/apache/flink/pull/10468#discussion_r355162022
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 ##########
 @@ -177,13 +157,112 @@ public void testTableSource() {
                
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
        }
 
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testTableSourceWithLegacyProperties() {
+               // prepare parameters for Kafka table source
+               final TableSchema schema = TableSchema.builder()
+                               .field(FRUIT_NAME, DataTypes.STRING())
+                               .field(COUNT, DataTypes.DECIMAL(10, 3))
+                               .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
+                               .field(PROC_TIME, DataTypes.TIMESTAMP(3))
+                               .build();
+
+               final List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors = Collections.singletonList(
+                               new RowtimeAttributeDescriptor(EVENT_TIME, new 
ExistingField(TIME), new AscendingTimestamps()));
+
+               final Map<String, String> fieldMapping = new HashMap<>();
+               fieldMapping.put(FRUIT_NAME, NAME);
+               fieldMapping.put(NAME, NAME);
+               fieldMapping.put(COUNT, COUNT);
+               fieldMapping.put(TIME, TIME);
+
+               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+               specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
+               specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
+
+               final TestDeserializationSchema deserializationSchema = new 
TestDeserializationSchema(
+                               TableSchema.builder()
+                                               .field(NAME, DataTypes.STRING())
+                                               .field(COUNT, 
DataTypes.DECIMAL(10, 3))
+                                               .field(TIME, 
DataTypes.TIMESTAMP(3))
+                                               .build()
+                                               .toRowType()
+               );
+
+               final KafkaTableSourceBase expected = 
getExpectedKafkaTableSource(
+                               schema,
+                               Optional.of(PROC_TIME),
+                               rowtimeAttributeDescriptors,
+                               fieldMapping,
+                               TOPIC,
+                               KAFKA_PROPERTIES,
+                               deserializationSchema,
+                               StartupMode.SPECIFIC_OFFSETS,
+                               specificOffsets);
+
+               TableSourceValidation.validateTableSource(expected);
+
+               // construct table source using descriptors and table source 
factory
+               final Map<String, String> legacyPropertiesMap = new HashMap<>();
+               legacyPropertiesMap.putAll(createKafkaSourceProperties());
+
+               // use legacy properties
+               legacyPropertiesMap.remove("connector.specific-offsets");
+               
legacyPropertiesMap.remove("connector.properties.zookeeper.connect");
+               
legacyPropertiesMap.remove("connector.properties.bootstrap.servers");
+               legacyPropertiesMap.remove("connector.properties.group.id");
+
+               
legacyPropertiesMap.put("connector.specific-offsets.0.partition", "0");
+               legacyPropertiesMap.put("connector.specific-offsets.0.offset", 
"100");
+               
legacyPropertiesMap.put("connector.specific-offsets.1.partition", "1");
+               legacyPropertiesMap.put("connector.specific-offsets.1.offset", 
"123");
+               legacyPropertiesMap.put("connector.properties.0.key", 
"zookeeper.connect");
+               legacyPropertiesMap.put("connector.properties.0.value", 
"dummy");
+               legacyPropertiesMap.put("connector.properties.1.key", 
"bootstrap.servers");
+               legacyPropertiesMap.put("connector.properties.1.value", 
"dummy");
+               legacyPropertiesMap.put("connector.properties.2.key", 
"group.id");
+               legacyPropertiesMap.put("connector.properties.2.value", 
"dummy");
+
+               final TableSource<?> actualSource = 
TableFactoryService.find(StreamTableSourceFactory.class, legacyPropertiesMap)
+                               .createStreamTableSource(legacyPropertiesMap);
+
+               assertEquals(expected, actualSource);
+
+               // test Kafka consumer
+               final KafkaTableSourceBase actualKafkaSource = 
(KafkaTableSourceBase) actualSource;
+               final StreamExecutionEnvironmentMock mock = new 
StreamExecutionEnvironmentMock();
+               actualKafkaSource.getDataStream(mock);
+               
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
+       }
+
+       protected Map<String, String> createKafkaSourceProperties() {
+               return new TestTableDescriptor(
+                               new Kafka()
+                                               .version(getKafkaVersion())
+                                               .topic(TOPIC)
+                                               .properties(KAFKA_PROPERTIES)
+                                               .sinkPartitionerRoundRobin() // 
test if accepted although not needed
 
 Review comment:
   It's default from flink project code style, I'll try to use one tab.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to