luoyuxia commented on a change in pull request #16417:
URL: https://github.com/apache/flink/pull/16417#discussion_r665844774



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
##########
@@ -41,17 +42,25 @@
     private final Pattern topicPattern;
 
     public KafkaTopicsDescriptor(
-            @Nullable List<String> fixedTopics, @Nullable Pattern 
topicPattern) {
+        @Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {
         checkArgument(
-                (fixedTopics != null && topicPattern == null)
-                        || (fixedTopics == null && topicPattern != null),
-                "Exactly one of either fixedTopics or topicPattern must be 
specified.");
+            (fixedTopics != null && topicPattern == null)
+                || (fixedTopics == null && topicPattern != null),
+            "Exactly one of either fixedTopics or topicPattern must be 
specified.");
 
         if (fixedTopics != null) {
             checkArgument(
-                    !fixedTopics.isEmpty(),
-                    "If subscribing to a fixed topics list, the supplied list 
cannot be empty.");
+                !fixedTopics.isEmpty(),
+                "If subscribing to a fixed topics list, the supplied list 
cannot be empty.");
+            fixedTopics.forEach(topic -> {
+                checkNotNull(topic, "An null topic exists in the subscribed 
topics list.");
+                checkArgument(!"".equals(topic), "An empty topic exists in the 
subscribed topics list.");
+            });
         }
+        if (topicPattern != null) {

Review comment:
       I think it's fine if topicPatten is empty.
   So we don't need to check whether topicPattern is empty or not.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -647,8 +646,8 @@ private FlinkKafkaProducer(
                 new FlinkKafkaProducer.TransactionStateSerializer(),
                 new FlinkKafkaProducer.ContextStateSerializer());
 
+        checkArgument(!"".equals(defaultTopic),"defaultTopic is empty");

Review comment:
       maybe we can use
   '
   checkArgument(
                   
!org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly(defaultTopic), 
"defaultTopic cannot be null or empty string");
   '
   to check

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
##########
@@ -85,7 +94,7 @@ public boolean isMatchingTopic(String topic) {
     @Override
     public String toString() {
         return (fixedTopics == null)
-                ? "Topic Regex Pattern (" + topicPattern.pattern() + ")"
-                : "Fixed Topics (" + fixedTopics + ")";
+            ? "Topic Regex Pattern (" + topicPattern.pattern() + ")"

Review comment:
       The code format is not right.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
##########
@@ -41,17 +42,25 @@
     private final Pattern topicPattern;
 
     public KafkaTopicsDescriptor(
-            @Nullable List<String> fixedTopics, @Nullable Pattern 
topicPattern) {
+        @Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {
         checkArgument(
-                (fixedTopics != null && topicPattern == null)
-                        || (fixedTopics == null && topicPattern != null),
-                "Exactly one of either fixedTopics or topicPattern must be 
specified.");
+            (fixedTopics != null && topicPattern == null)
+                || (fixedTopics == null && topicPattern != null),
+            "Exactly one of either fixedTopics or topicPattern must be 
specified.");
 
         if (fixedTopics != null) {
             checkArgument(
-                    !fixedTopics.isEmpty(),
-                    "If subscribing to a fixed topics list, the supplied list 
cannot be empty.");
+                !fixedTopics.isEmpty(),
+                "If subscribing to a fixed topics list, the supplied list 
cannot be empty.");
+            fixedTopics.forEach(topic -> {
+                checkNotNull(topic, "An null topic exists in the subscribed 
topics list.");

Review comment:
       Here, we can use 
   '
   checkArgument(
                   
!org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly(topic), "topic in the 
subscribed topics list cannot be null or empty string.");
   '
   to check

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
##########
@@ -41,17 +42,25 @@
     private final Pattern topicPattern;
 
     public KafkaTopicsDescriptor(
-            @Nullable List<String> fixedTopics, @Nullable Pattern 
topicPattern) {
+        @Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {

Review comment:
       The code format is not right, have you follow the guide 
   
https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/ide_setup/#code-formatting
 ?
   We need a consistent code format.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to