This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch 1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/1.x-branch by this push:
     new fda3229  Issue fix - incorrect kafka spout lag error message for newer 
kafka spout version but null group id config
     new 9037453  Merge branch 'STORM-3348' of 
https://github.com/OjhaVivek/storm into asfgit-1.x-branch
fda3229 is described below

commit fda32297d7835d375138e1845ca88423579c0cd8
Author: Vivek Ojha <[email protected]>
AuthorDate: Thu Feb 28 14:31:25 2019 +0530

    Issue fix - incorrect kafka spout lag error message for newer kafka spout 
version but null group id config
---
 .../jvm/org/apache/storm/blobstore/LocalFsBlobStore.java    |  2 +-
 .../src/jvm/org/apache/storm/utils/TopologySpoutLag.java    | 13 +++++++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java 
b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
index be4c3ad..dabfc93 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -43,7 +43,7 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;;
+import java.util.Set;
 
 import static org.apache.storm.blobstore.BlobStoreAclHandler.ADMIN;
 import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java 
b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index 058776f..349a029 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -39,6 +39,8 @@ import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
+
 public class TopologySpoutLag {
     private static final String SPOUT_ID = "spoutId";
     private static final String SPOUT_TYPE = "spoutType";
@@ -152,7 +154,7 @@ public class TopologySpoutLag {
                                                   Map topologyConf) throws 
IOException {
         ComponentCommon componentCommon = spoutSpec.get_common();
         String json = componentCommon.get_json_conf();
-        if (json != null && !json.isEmpty()) {
+        if (!Strings.isNullOrEmpty(json)) {
             Map<String, Object> jsonMap = null;
             try {
                 jsonMap = (Map<String, Object>) 
JSONValue.parseWithException(json);
@@ -177,7 +179,7 @@ public class TopologySpoutLag {
         String json = componentCommon.get_json_conf();
         Map<String, Object> result = null;
         String errorMsg = "Offset lags for kafka not supported for older 
versions. Please update kafka spout to latest version.";
-        if (json != null && !json.isEmpty()) {
+        if (!Strings.isNullOrEmpty(json)) {
             List<String> commands = new ArrayList<>();
             String stormHomeDir = System.getenv("STORM_BASE_DIR");
             if (stormHomeDir != null && !stormHomeDir.endsWith("/")) {
@@ -216,6 +218,13 @@ public class TopologySpoutLag {
                         extraPropertiesFile.delete();
                     }
                 }
+            } else if(!old) {
+              errorMsg = new StringBuilder(TOPICS_CONFIG).append(", ")
+                  .append(GROUPID_CONFIG)
+                  .append(" and ")
+                  .append(BOOTSTRAP_CONFIG)
+                  .append(" are mandatory and should not be null for newer 
versions of kafka spout.")
+                  .toString();
             }
         }
 

Reply via email to