wuchong commented on a change in pull request #13910:
URL: https://github.com/apache/flink/pull/13910#discussion_r517991240
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -179,13 +183,37 @@ public ScanRuntimeProvider
getScanRuntimeProvider(ScanContext context) {
@Override
public Map<String, DataType> listReadableMetadata() {
final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+ // add value format metadata with prefix
Review comment:
Add a comment to mention why we must put the format metadata before
connector metadata? I think the order can't be switched.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
##########
@@ -131,7 +133,8 @@ public void testKafkaDebeziumChangelogSource() throws
Exception {
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
TableResult tableResult = tEnv.executeSql(
- "INSERT INTO sink SELECT name, SUM(weight) FROM
debezium_source GROUP BY name");
+ "INSERT INTO sink SELECT FIRST_VALUE(origin), name,
SUM(weight) "
+ + "FROM debezium_source GROUP BY name");
Review comment:
Could you make this case more complex? For example, declaring 2 format
metadata and 2 connector metadata, but only select 1 format metadata and 1
connector metadata. This helps to verify format and connector metadata can work
together and the metadata pruning also works.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
##########
@@ -109,6 +109,7 @@ public void testKafkaDebeziumChangelogSource() throws
Exception {
String bootstraps =
standardProps.getProperty("bootstrap.servers");
String sourceDDL = String.format(
"CREATE TABLE debezium_source (" +
+ " origin STRING METADATA FROM 'value.source.table'," +
// test some metadata
Review comment:
Add `VIRTUAL` to the metadata (even it is not tested)?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]