This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch 1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/1.x-branch by this push:
new fda3229 Issue fix - incorrect kafka spout lag error message for newer
kafka spout version but null group id config
new 9037453 Merge branch 'STORM-3348' of
https://github.com/OjhaVivek/storm into asfgit-1.x-branch
fda3229 is described below
commit fda32297d7835d375138e1845ca88423579c0cd8
Author: Vivek Ojha <[email protected]>
AuthorDate: Thu Feb 28 14:31:25 2019 +0530
Issue fix - incorrect kafka spout lag error message for newer kafka spout
version but null group id config
---
.../jvm/org/apache/storm/blobstore/LocalFsBlobStore.java | 2 +-
.../src/jvm/org/apache/storm/utils/TopologySpoutLag.java | 13 +++++++++++--
2 files changed, 12 insertions(+), 3 deletions(-)
diff --git
a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
index be4c3ad..dabfc93 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -43,7 +43,7 @@ import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;;
+import java.util.Set;
import static org.apache.storm.blobstore.BlobStoreAclHandler.ADMIN;
import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index 058776f..349a029 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -39,6 +39,8 @@ import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Strings;
+
public class TopologySpoutLag {
private static final String SPOUT_ID = "spoutId";
private static final String SPOUT_TYPE = "spoutType";
@@ -152,7 +154,7 @@ public class TopologySpoutLag {
Map topologyConf) throws
IOException {
ComponentCommon componentCommon = spoutSpec.get_common();
String json = componentCommon.get_json_conf();
- if (json != null && !json.isEmpty()) {
+ if (!Strings.isNullOrEmpty(json)) {
Map<String, Object> jsonMap = null;
try {
jsonMap = (Map<String, Object>)
JSONValue.parseWithException(json);
@@ -177,7 +179,7 @@ public class TopologySpoutLag {
String json = componentCommon.get_json_conf();
Map<String, Object> result = null;
String errorMsg = "Offset lags for kafka not supported for older
versions. Please update kafka spout to latest version.";
- if (json != null && !json.isEmpty()) {
+ if (!Strings.isNullOrEmpty(json)) {
List<String> commands = new ArrayList<>();
String stormHomeDir = System.getenv("STORM_BASE_DIR");
if (stormHomeDir != null && !stormHomeDir.endsWith("/")) {
@@ -216,6 +218,13 @@ public class TopologySpoutLag {
extraPropertiesFile.delete();
}
}
+ } else if(!old) {
+ errorMsg = new StringBuilder(TOPICS_CONFIG).append(", ")
+ .append(GROUPID_CONFIG)
+ .append(" and ")
+ .append(BOOTSTRAP_CONFIG)
+ .append(" are mandatory and should not be null for newer
versions of kafka spout.")
+ .toString();
}
}