Repository: metron
Updated Branches:
  refs/heads/master 954b5ea13 -> 57d2764c3


METRON-1031 Management UI Cannot Start Topologies in Kerberized Environment 
(nickwallen) closes apache/metron#647


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/57d2764c
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/57d2764c
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/57d2764c

Branch: refs/heads/master
Commit: 57d2764c318c8ec7d37d8a3f11e4609e35078285
Parents: 954b5ea
Author: nickwallen <[email protected]>
Authored: Mon Jul 17 15:42:40 2017 -0400
Committer: nickallen <[email protected]>
Committed: Mon Jul 17 15:42:40 2017 -0400

----------------------------------------------------------------------
 .../METRON/CURRENT/package/templates/metron.j2  |  2 +
 .../src/main/config/rest_application.yml        |  3 ++
 .../apache/metron/rest/MetronRestConstants.java |  2 +
 .../rest/service/impl/StormCLIWrapper.java      | 55 ++++++++++++++++----
 .../src/main/resources/application-docker.yml   |  5 +-
 .../src/main/resources/application-vagrant.yml  |  2 +
 .../rest/service/impl/StormCLIWrapperTest.java  | 36 ++++++++++++-
 7 files changed, 91 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
index 7543a67..5d07fb2 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
@@ -37,3 +37,5 @@ SECURITY_ENABLED={{security_enabled|lower}}
 {% endif %}
 {% if metron_keytab_path is defined 
%}METRON_SERVICE_KEYTAB="{{metron_keytab_path}}"
 {% endif %}
+KAFKA_SECURITY_PROTOCOL="{{kafka_security_protocol}}"
+PARSER_TOPOLOGY_OPTIONS="/home/{{metron_user}}/.storm/storm.config"

http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/main/config/rest_application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml 
b/metron-interface/metron-rest/src/main/config/rest_application.yml
index dbb0b43..2afbb8a 100644
--- a/metron-interface/metron-rest/src/main/config/rest_application.yml
+++ b/metron-interface/metron-rest/src/main/config/rest_application.yml
@@ -28,6 +28,8 @@ zookeeper:
 kafka:
   broker:
     url: ${BROKERLIST}
+  security:
+    protocol: ${KAFKA_SECURITY_PROTOCOL}
 
 hdfs:
   namenode:
@@ -43,6 +45,7 @@ storm:
     url: ${STORM_REST_URL}
   parser:
     script.path: ${METRON_HOME}/bin/start_parser_topology.sh
+    topology.options: ${PARSER_TOPOLOGY_OPTIONS}
   enrichment:
     script.path: ${METRON_HOME}/bin/start_enrichment_topology.sh
   indexing:

http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index 2a147e6..dbdcfc4 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -41,6 +41,8 @@ public class MetronRestConstants {
   public static final String PARSER_SCRIPT_PATH_SPRING_PROPERTY = 
"storm.parser.script.path";
   public static final String ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY = 
"storm.enrichment.script.path";
   public static final String INDEXING_SCRIPT_PATH_SPRING_PROPERTY = 
"storm.indexing.script.path";
+  public static final String PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY = 
"storm.parser.topology.options";
+  public static final String KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY = 
"kafka.security.protocol";
 
   public static final String ZK_URL_SPRING_PROPERTY = "zookeeper.url";
   public static final String ZK_CLIENT_SESSION_TIMEOUT = 
"zookeeper.client.timeout.session";

http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
index 1158721..463c925 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
@@ -17,14 +17,19 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +40,8 @@ import static 
org.apache.metron.rest.MetronRestConstants.INDEXING_TOPOLOGY_NAME;
 
 public class StormCLIWrapper {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private Environment environment;
 
   @Autowired
@@ -75,26 +82,52 @@ public class StormCLIWrapper {
   protected int runCommand(String[] command) throws RestException {
     ProcessBuilder pb = getProcessBuilder(command);
     pb.inheritIO();
-    Process process = null;
+    LOG.debug("Running command: cmd={}", String.join(" ", command));
+
+    Process process;
     try {
       process = pb.start();
       process.waitFor();
+
     } catch (Exception e) {
       throw new RestException(e);
     }
-    return process.exitValue();
+
+    int exitValue = process.exitValue();
+    LOG.debug("Command completed: cmd={}, exit={}", String.join(" ", command), 
exitValue);
+
+    return exitValue;
   }
 
   protected String[] getParserStartCommand(String name) {
-    String[] command = new String[7];
-    command[0] = 
environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY);
-    command[1] = "-k";
-    command[2] = 
environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY);
-    command[3] = "-z";
-    command[4] = 
environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY);
-    command[5] = "-s";
-    command[6] = name;
-    return command;
+    List<String> command = new ArrayList<>();
+    command.add( 
environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY));
+
+    // sensor type
+    command.add( "-s");
+    command.add( name);
+
+    // zookeeper
+    command.add( "-z");
+    command.add( 
environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY));
+
+    // kafka broker
+    command.add( "-k");
+    command.add( 
environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
+
+    // kafka security protocol
+    command.add( "-ksp");
+    command.add( 
environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY));
+
+    // extra topology options
+    boolean kerberosEnabled = 
environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, 
Boolean.class, false);
+    boolean topologyOptionsDefined = 
StringUtils.isNotBlank(environment.getProperty(MetronRestConstants.PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY));
+    if (kerberosEnabled && topologyOptionsDefined) {
+        command.add("-e");
+        
command.add(environment.getProperty(MetronRestConstants.PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY));
+    }
+
+    return command.toArray(new String[0]);
   }
 
   protected String[] getEnrichmentStartCommand() {

http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/main/resources/application-docker.yml
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/resources/application-docker.yml 
b/metron-interface/metron-rest/src/main/resources/application-docker.yml
index 5891417..15fa293 100644
--- a/metron-interface/metron-rest/src/main/resources/application-docker.yml
+++ b/metron-interface/metron-rest/src/main/resources/application-docker.yml
@@ -36,6 +36,8 @@ zookeeper:
 kafka:
   broker:
     url: ${docker.host.address}:9092
+  security:
+    protocol: PLAINTEXT
 
 hdfs:
   namenode:
@@ -51,7 +53,8 @@ storm:
     url:   ${docker.host.address}:8080
   parser:
     script.path: /usr/metron/${metron.version}/bin/start_parser_topology.sh
+    topology.options: ${PARSER_TOPOLOGY_OPTIONS}
   enrichment:
     script.path: /usr/metron/${metron.version}/bin/start_enrichment_topology.sh
   indexing:
-    script.path: 
/usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
+    script.path: 
/usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml 
b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
index 1884ca3..31b5784 100644
--- a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
+++ b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
@@ -30,6 +30,8 @@ zookeeper:
 kafka:
   broker:
     url: node1:6667
+  security:
+    protocol: PLAINTEXT
 
 hdfs:
   namenode:

http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
index 9fb067a..73d54d8 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
@@ -24,7 +24,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.springframework.core.env.Environment;
@@ -76,11 +75,44 @@ public class StormCLIWrapperTest {
     
when(environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)).thenReturn("kafka_broker_url");
     
when(environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY)).thenReturn("zookeeper_url");
     
when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY,
 Boolean.class, false)).thenReturn(false);
+    
when(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)).thenReturn("kafka_security_protocol");
     when(process.exitValue()).thenReturn(0);
 
     assertEquals(0, stormCLIWrapper.startParserTopology("bro"));
     verify(process).waitFor();
-    verifyNew(ProcessBuilder.class).withArguments("/start_parser", "-k", 
"kafka_broker_url", "-z", "zookeeper_url", "-s", "bro");
+    verifyNew(ProcessBuilder.class).withArguments("/start_parser",
+            "-s", "bro",
+            "-z", "zookeeper_url",
+            "-k", "kafka_broker_url",
+            "-ksp", "kafka_security_protocol");
+  }
+
+  /**
+   * If Kerberos is enabled and the PARSER_TOPOLOGY_OPTIONS field is defined, 
then extra topology options
+   * will be passed to the Parser topology.
+   */
+  @Test
+  public void startParserTopologyWithExtraTopologyOptions() throws Exception {
+
+    
whenNew(ProcessBuilder.class).withParameterTypes(String[].class).withArguments(anyVararg()).thenReturn(processBuilder);
+
+    when(processBuilder.start()).thenReturn(process);
+    
when(environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_parser");
+    
when(environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)).thenReturn("kafka_broker_url");
+    
when(environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY)).thenReturn("zookeeper_url");
+    
when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY,
 Boolean.class, false)).thenReturn(true);
+    
when(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)).thenReturn("kafka_security_protocol");
+    
when(environment.getProperty(MetronRestConstants.PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY)).thenReturn("parser_topology_options");
+    when(process.exitValue()).thenReturn(0);
+
+    assertEquals(0, stormCLIWrapper.startParserTopology("bro"));
+    verify(process, times(2)).waitFor();
+    verifyNew(ProcessBuilder.class).withArguments("/start_parser",
+            "-s", "bro",
+            "-z", "zookeeper_url",
+            "-k", "kafka_broker_url",
+            "-ksp", "kafka_security_protocol",
+            "-e", "parser_topology_options");
   }
 
   @Test

Reply via email to