klam-shop commented on code in PR #109:
URL:
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1700749005
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java:
##########
@@ -636,21 +618,25 @@ public static DynamicTableFactory.Context
autoCompleteSchemaRegistrySubject(
private static Map<String, String> autoCompleteSchemaRegistrySubject(
Map<String, String> options) {
Configuration configuration = Configuration.fromMap(options);
- // the subject autoComplete should only be used in sink, check the
topic first
- validateSinkTopic(configuration);
- final Optional<String> valueFormat =
configuration.getOptional(VALUE_FORMAT);
- final Optional<String> keyFormat =
configuration.getOptional(KEY_FORMAT);
- final Optional<String> format = configuration.getOptional(FORMAT);
- final String topic = configuration.get(TOPIC).get(0);
-
- if (format.isPresent() &&
SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
- autoCompleteSubject(configuration, format.get(), topic + "-value");
- } else if (valueFormat.isPresent() &&
SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
- autoCompleteSubject(configuration, "value." + valueFormat.get(),
topic + "-value");
- }
+ // the subject autoComplete should only be used in sink with a single
topic, check the topic
Review Comment:
I think to make the subject autoComplete work with the
SCHEMA_REGISTRY_FORMATS, we need to have a follow-up issue to have the formats
handle multiple subjects dynamically.
Writing to a static subject for many topics would not match with the
Confluent Schema registry default
[TopicNameStrategy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy)
which derives its subject from the topic name, eg. `topic-value` and
`topic-key` subjects for topic `topic`. I decided to disable the logic here for
when there are multiple topics being produced to, and leave the extension for
future work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]