Airblader commented on a change in pull request #16142:
URL: https://github.com/apache/flink/pull/16142#discussion_r656189770
##########
File path: docs/content/docs/connectors/table/kafka.md
##########
@@ -193,10 +193,10 @@ Connector Options
</tr>
<tr>
<td><h5>topic</h5></td>
- <td>required for sink</td>
+ <td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Topic name(s) to read data from when the table is used as source. It
also supports topic list for source by separating topic by semicolon like
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic"
can be specified for sources. When the table is used as sink, the topic name is
the topic to write data to. Note topic list is not supported for sinks.</td>
+ <td>Topic name(s) to read data from when the table is used as source. It
also supports topic list for source by separating topic by semicolon like
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic"
can be specified for sources. When the table is used as sink, the topic name is
the topic to write data. If the "topic" option isn't specified for sink table,
"topic" metadata column must be specified. If the "topic" option is specified
for sink table, "topic" metadata column must be readable. Note topic list is
not supported for sinks.</td>
Review comment:
Please also update the documentation of `KafkaOptions#TOPIC` since the
`ConfigOption` should be the source of truth for documentation.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -216,7 +216,7 @@
protected final Properties producerConfig;
/** The name of the default topic this producer is writing data to. */
- protected final String defaultTopicId;
+ protected String defaultTopicId;
Review comment:
Why can't this be `final` anymore? It's still entirely initialised
within the constructor, isn't it?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -166,6 +167,15 @@ public void setPartitions(int[] partitions) {
@Override
public String getTargetTopic(RowData element) {
+ // no explicit topic defined in table options, use the writable
metadata column topic
+ topic =
Review comment:
What happens / should happen if the topic returned by metadata differs
between different elements? We're now essentially reading the metadata of
whichever element comes first and then ignore it forever, right? That isn't the
behavior I would expect given the documentation.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -141,7 +142,7 @@ public void open(SerializationSchema.InitializationContext
context) throws Excep
}
return new ProducerRecord<>(
- topic,
+ getTargetTopic(consumedRow),
Review comment:
Wouldn't this also be necessary in the shortcut path at the beginning of
this method?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
##########
@@ -300,6 +306,240 @@ public void testKafkaTableWithMultipleTopics() throws
Exception {
topics.forEach(super::deleteTestTopic);
}
+ @Test
+ public void testKafkaSinkWithMetadataIncludeTopicOption() {
+ if (isLegacyConnector) {
+ return;
+ }
+ // we always use a different topic name for each parameterized topic,
+ // in order to make sure the topic can be created.
+ final String topic = "metadata_topic_" + format;
+ createTestTopic(topic, 1, 1);
+
+ // ---------- Produce an event time stream into Kafka
-------------------
+ String groupId = getStandardProps().getProperty("group.id");
+ String bootstraps = getBootstrapServers();
+
+ final String createTable =
+ String.format(
+ "CREATE TABLE kafka (\n"
+ + " `physical_1` STRING,\n"
+ + " `physical_2` INT,\n"
+ // metadata fields are out of order on purpose
+ // offset is ignored because it might not be
deterministic
+ + " `timestamp-type` STRING METADATA
VIRTUAL,\n"
+ + " `timestamp` TIMESTAMP(3) METADATA,\n"
+ + " `leader-epoch` INT METADATA VIRTUAL,\n"
+ + " `headers` MAP<STRING, BYTES> METADATA,\n"
+ + " `partition` INT METADATA VIRTUAL,\n"
+ + " `topic` STRING METADATA,\n"
+ + " `physical_3` BOOLEAN\n"
+ + ") WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'topic' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'properties.group.id' = '%s',\n"
+ + " 'scan.startup.mode' =
'earliest-offset',\n"
+ + " %s\n"
+ + ")",
+ topic, bootstraps, groupId, formatOptions());
+
+ tEnv.executeSql(createTable);
+
+ String initialValues =
+ String.format(
+ "INSERT INTO kafka\n"
+ + "VALUES\n"
+ + " ('data 1', 1, TIMESTAMP '2020-03-08
13:12:11.123', MAP['k1', X'C0FFEE', 'k2', X'BABE'], '%s', TRUE),\n"
+ + " ('data 2', 2, TIMESTAMP '2020-03-09
13:12:11.123', CAST(NULL AS MAP<STRING, BYTES>), '%s', FALSE),\n"
+ + " ('data 3', 3, TIMESTAMP '2020-03-10
13:12:11.123', MAP['k1', X'10', 'k2', X'20'], '%s', TRUE)",
+ topic, topic, topic);
+ try {
+ tEnv.executeSql(initialValues).await();
+ fail(
+ "Unable to create the Kafka sink table with table option
'topic' and metadata column 'topic'.");
+ } catch (Exception e) {
+ assertTrue(e instanceof ValidationException);
+ assertEquals(
+ String.format(
+ "Invalid metadata key '%s' in column 'topic' of
table 'default_catalog.default_database.kafka'. "
+ + "The %s class '%s' supports the
following metadata keys for writing:\n%s",
+ TOPIC.key,
+ DynamicTableSink.class.getSimpleName(),
+ KafkaDynamicSink.class.getName(),
+ String.join("\n", Arrays.asList(HEADERS.key,
TIMESTAMP.key))),
+ e.getMessage());
+ }
+
+ // ------------- cleanup -------------------
+
+ deleteTestTopic(topic);
Review comment:
We should make sure this will always be called as soon as
`createTestTopic` has been called. Right now there's a few things that could
throw exceptions in between and would prevent this from being called.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -361,18 +360,25 @@ public static void validateSourceTopic(ReadableConfig
tableOptions) {
}
public static void validateSinkTopic(ReadableConfig tableOptions) {
- String errorMessageTemp =
- "Flink Kafka sink currently only supports single topic, but
got %s: %s.";
- if (!isSingleTopic(tableOptions)) {
- if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
+ if (tableOptions.getOptional(TOPIC).isPresent()) {
+ if (tableOptions.get(TOPIC).isEmpty()) {
throw new ValidationException(
String.format(
- errorMessageTemp,
- "'topic-pattern'",
- tableOptions.get(TOPIC_PATTERN)));
- } else {
- throw new ValidationException(
- String.format(errorMessageTemp, "'topic'",
tableOptions.get(TOPIC)));
+ "Flink Kafka sink doesn't support empty topic,
but got %s: %s.",
+ "'topic'", tableOptions.get(TOPIC)));
Review comment:
nit: it isn't really necessary to call `tableOptions.get(TOPIC)` here
again since this will be called exactly when the list is empty.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -187,6 +188,9 @@ public DynamicTableSource createDynamicTableSource(Context
context) {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
+ // validate the table option 'topic' and 'topic-pattern'
+
validateSinkTopic(Configuration.fromMap(context.getCatalogTable().getOptions()));
Review comment:
Use `helper.getOptions()` (already stored in a variable `tableOptions`
below) instead of building another `Configuration` here? Is there a reason to
have this on the very top and not e.g. where `validateTableSinkOptions` is
called?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -166,6 +167,15 @@ public void setPartitions(int[] partitions) {
@Override
public String getTargetTopic(RowData element) {
+ // no explicit topic defined in table options, use the writable
metadata column topic
+ topic =
+ StringUtils.isNullOrWhitespaceOnly(topic)
Review comment:
I think it would be better if we ensure that whitespace-only is mapped
to null before it is assigned to `topic` rather than checking it here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]