This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new d1bd668  STORM-3348 Incorrect message when group id is not provided as 
kafka spout config on storm ui
     new b074136  Merge branch 'STORM-3348-master' of 
https://github.com/OjhaVivek/storm into asfgit-master
d1bd668 is described below

commit d1bd66843b9c2089dcd8189c2bfbeab9c2990417
Author: Vivek Ojha <[email protected]>
AuthorDate: Fri Mar 1 19:02:00 2019 +0530

    STORM-3348 Incorrect message when group id is not provided as kafka spout 
config on storm ui
---
 .../jvm/org/apache/storm/utils/TopologySpoutLag.java    | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java 
b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index eb4e4b6..ceab7a5 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -36,6 +36,8 @@ import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
+
 public class TopologySpoutLag {
     // FIXME: This class can be moved to webapp once UI porting is done.
 
@@ -78,7 +80,7 @@ public class TopologySpoutLag {
         commands.add("-b");
         commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
         String securityProtocol = (String) 
jsonConf.get(SECURITY_PROTOCOL_CONFIG);
-        if (securityProtocol != null && !securityProtocol.isEmpty()) {
+        if (!Strings.isNullOrEmpty(securityProtocol)) {
             commands.add("-s");
             commands.add(securityProtocol);
         }
@@ -113,7 +115,7 @@ public class TopologySpoutLag {
         throws IOException {
         ComponentCommon componentCommon = spoutSpec.get_common();
         String json = componentCommon.get_json_conf();
-        if (json != null && !json.isEmpty()) {
+        if (!Strings.isNullOrEmpty(json)) {
             Map<String, Object> jsonMap = null;
             try {
                 jsonMap = (Map<String, Object>) 
JSONValue.parseWithException(json);
@@ -133,8 +135,15 @@ public class TopologySpoutLag {
         ComponentCommon componentCommon = spoutSpec.get_common();
         String json = componentCommon.get_json_conf();
         Map<String, Object> result = null;
-        String errorMsg = "Offset lags for kafka not supported for older 
versions. Please update kafka spout to latest version.";
-        if (json != null && !json.isEmpty()) {
+        String errorMsg = new StringBuilder("Make sure Kafka spout version is 
latest and ")
+            .append(TOPICS_CONFIG)
+            .append(", ")
+            .append(GROUPID_CONFIG)
+            .append(" & ")
+            .append(BOOTSTRAP_CONFIG)
+            .append(" are not null for newer versions of Kafka spout.")
+            .toString();
+        if (!Strings.isNullOrEmpty(json)) {
             List<String> commands = new ArrayList<>();
             String stormHomeDir = System.getenv("STORM_BASE_DIR");
             if (stormHomeDir != null && !stormHomeDir.endsWith("/")) {

Reply via email to