This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new d1bd668 STORM-3348 Incorrect message when group id is not provided as
kafka spout config on storm ui
new b074136 Merge branch 'STORM-3348-master' of
https://github.com/OjhaVivek/storm into asfgit-master
d1bd668 is described below
commit d1bd66843b9c2089dcd8189c2bfbeab9c2990417
Author: Vivek Ojha <[email protected]>
AuthorDate: Fri Mar 1 19:02:00 2019 +0530
STORM-3348 Incorrect message when group id is not provided as kafka spout
config on storm ui
---
.../jvm/org/apache/storm/utils/TopologySpoutLag.java | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index eb4e4b6..ceab7a5 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -36,6 +36,8 @@ import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Strings;
+
public class TopologySpoutLag {
// FIXME: This class can be moved to webapp once UI porting is done.
@@ -78,7 +80,7 @@ public class TopologySpoutLag {
commands.add("-b");
commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
String securityProtocol = (String)
jsonConf.get(SECURITY_PROTOCOL_CONFIG);
- if (securityProtocol != null && !securityProtocol.isEmpty()) {
+ if (!Strings.isNullOrEmpty(securityProtocol)) {
commands.add("-s");
commands.add(securityProtocol);
}
@@ -113,7 +115,7 @@ public class TopologySpoutLag {
throws IOException {
ComponentCommon componentCommon = spoutSpec.get_common();
String json = componentCommon.get_json_conf();
- if (json != null && !json.isEmpty()) {
+ if (!Strings.isNullOrEmpty(json)) {
Map<String, Object> jsonMap = null;
try {
jsonMap = (Map<String, Object>)
JSONValue.parseWithException(json);
@@ -133,8 +135,15 @@ public class TopologySpoutLag {
ComponentCommon componentCommon = spoutSpec.get_common();
String json = componentCommon.get_json_conf();
Map<String, Object> result = null;
- String errorMsg = "Offset lags for kafka not supported for older
versions. Please update kafka spout to latest version.";
- if (json != null && !json.isEmpty()) {
+ String errorMsg = new StringBuilder("Make sure Kafka spout version is
latest and ")
+ .append(TOPICS_CONFIG)
+ .append(", ")
+ .append(GROUPID_CONFIG)
+ .append(" & ")
+ .append(BOOTSTRAP_CONFIG)
+ .append(" are not null for newer versions of Kafka spout.")
+ .toString();
+ if (!Strings.isNullOrEmpty(json)) {
List<String> commands = new ArrayList<>();
String stormHomeDir = System.getenv("STORM_BASE_DIR");
if (stormHomeDir != null && !stormHomeDir.endsWith("/")) {