twalthr commented on a change in pull request #16142:
URL: https://github.com/apache/flink/pull/16142#discussion_r657870086
##########
File path: docs/content/docs/connectors/table/kafka.md
##########
@@ -193,10 +193,10 @@ Connector Options
</tr>
<tr>
<td><h5>topic</h5></td>
- <td>required for sink</td>
+ <td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Topic name(s) to read data from when the table is used as source. It
also supports topic list for source by separating topic by semicolon like
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic"
can be specified for sources. When the table is used as sink, the topic name is
the topic to write data to. Note topic list is not supported for sinks.</td>
+ <td>Topic name(s) to read data from when the table is used as source. It
also supports topic list for source by separating topic by semicolon like
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic"
can be specified for sources. When the table is used as sink, the topic name is
the topic to write data. If the "topic" option isn't specified for sink table,
"topic" metadata column must be specified. If the "topic" option is specified
for sink table, "topic" metadata column must be readable. Note topic list is
not supported for sinks.</td>
</tr>
<tr>
Review comment:
I wasn't aware that we support lists already in the topic option.
Wouldn't it be useful to support this option to limit the number of topic
available in the dynamic metadata column. I think this is a very common use
case, because data platform teams usually only allow a set of topics to be
writable.
btw `"topic" metadata column must be readable` -> `"topic" metadata column
can only be readable`
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -145,7 +145,7 @@ public KafkaDynamicSink(
// Mutable attributes
this.metadataKeys = Collections.emptyList();
// Kafka-specific attributes
- this.topic = checkNotNull(topic, "Topic must not be null.");
+ this.topic = topic;
Review comment:
mark nullability in parameter using `@Nullable`
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -216,7 +216,7 @@
protected final Properties producerConfig;
/** The name of the default topic this producer is writing data to. */
- protected final String defaultTopicId;
+ protected String defaultTopicId;
Review comment:
In any case we should make the parameter as `@Nullable` and update the
JavaDocs
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -166,7 +167,16 @@ public void setPartitions(int[] partitions) {
@Override
public String getTargetTopic(RowData element) {
- return topic;
+ if (!StringUtils.isNullOrWhitespaceOnly(topic)) {
Review comment:
this is a very expensive operation that is executed for every record in
the hot path, I don't think if we need this whitespace logic at all and can
rely on the user to specify it correctly
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -166,7 +167,16 @@ public void setPartitions(int[] partitions) {
@Override
public String getTargetTopic(RowData element) {
- return topic;
+ if (!StringUtils.isNullOrWhitespaceOnly(topic)) {
+ return topic;
+ }
+ // no explicit topic defined in table options, use the writable
metadata column topic
+ final String topicMetadata = readMetadata(element,
KafkaDynamicSink.WritableMetadata.TOPIC);
+ if (StringUtils.isNullOrWhitespaceOnly(topicMetadata)) {
+ throw new NullPointerException(
Review comment:
use a different kind of exception, maybe `RuntimeException`
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -224,7 +224,7 @@ public DynamicTableSink createDynamicTableSink(Context
context) {
keyProjection,
valueProjection,
keyPrefix,
- tableOptions.get(TOPIC).get(0),
+ tableOptions.getOptional(TOPIC).isPresent() ?
tableOptions.get(TOPIC).get(0) : null,
Review comment:
this could cause a index out of bounds exception if the list is empty?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -41,7 +42,7 @@
private final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
- private final String topic;
+ private String topic;
Review comment:
nit: why is it not final anymore? I cannot see another mutation in your
changes.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -355,6 +356,21 @@ public int hashCode() {
//
--------------------------------------------------------------------------------------------
enum WritableMetadata {
+ TOPIC(
+ "topic",
+ DataTypes.STRING().nullable(),
Review comment:
shouldn't it be notNull if we throw an exception during runtime
otherwise?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -144,7 +144,7 @@ private KafkaOptions() {}
.noDefaultValue()
.withDescription(
"Topic names from which the table is read. Either
'topic' or 'topic-pattern' must be set for source. "
- + "Option 'topic' is required for sink.");
+ + "Option 'topic' is optional for sink
through specifying the 'topic' metadata column.");
Review comment:
This still seems to be different from the docs. Please generate the docs
from the options using `ConfigOptionDocGenerator`.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -216,7 +216,7 @@
protected final Properties producerConfig;
/** The name of the default topic this producer is writing data to. */
- protected final String defaultTopicId;
+ protected String defaultTopicId;
Review comment:
@AHeise you are more familiar with the Kafka producer? Is this a valid
change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]