luoyuxia commented on a change in pull request #16417:
URL: https://github.com/apache/flink/pull/16417#discussion_r666018479
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
##########
@@ -41,17 +42,25 @@
private final Pattern topicPattern;
public KafkaTopicsDescriptor(
- @Nullable List<String> fixedTopics, @Nullable Pattern
topicPattern) {
+ @Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {
checkArgument(
- (fixedTopics != null && topicPattern == null)
- || (fixedTopics == null && topicPattern != null),
- "Exactly one of either fixedTopics or topicPattern must be
specified.");
+ (fixedTopics != null && topicPattern == null)
+ || (fixedTopics == null && topicPattern != null),
+ "Exactly one of either fixedTopics or topicPattern must be
specified.");
if (fixedTopics != null) {
checkArgument(
- !fixedTopics.isEmpty(),
- "If subscribing to a fixed topics list, the supplied list
cannot be empty.");
+ !fixedTopics.isEmpty(),
+ "If subscribing to a fixed topics list, the supplied list
cannot be empty.");
+ fixedTopics.forEach(topic -> {
+ checkNotNull(topic, "An null topic exists in the subscribed
topics list.");
+ checkArgument(!"".equals(topic), "An empty topic exists in the
subscribed topics list.");
+ });
}
+ if (topicPattern != null) {
Review comment:
It's no need to check topicPattern is empty or not, it's legal if
topicPattern is empty.
we can just delete the statement
if (topicPattern != null) {
...
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]