Repository: metron Updated Branches: refs/heads/master 954b5ea13 -> 57d2764c3
METRON-1031 Management UI Cannot Start Topologies in Kerberized Environment (nickwallen) closes apache/metron#647 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/57d2764c Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/57d2764c Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/57d2764c Branch: refs/heads/master Commit: 57d2764c318c8ec7d37d8a3f11e4609e35078285 Parents: 954b5ea Author: nickwallen <[email protected]> Authored: Mon Jul 17 15:42:40 2017 -0400 Committer: nickallen <[email protected]> Committed: Mon Jul 17 15:42:40 2017 -0400 ---------------------------------------------------------------------- .../METRON/CURRENT/package/templates/metron.j2 | 2 + .../src/main/config/rest_application.yml | 3 ++ .../apache/metron/rest/MetronRestConstants.java | 2 + .../rest/service/impl/StormCLIWrapper.java | 55 ++++++++++++++++---- .../src/main/resources/application-docker.yml | 5 +- .../src/main/resources/application-vagrant.yml | 2 + .../rest/service/impl/StormCLIWrapperTest.java | 36 ++++++++++++- 7 files changed, 91 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 index 7543a67..5d07fb2 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 @@ -37,3 +37,5 @@ SECURITY_ENABLED={{security_enabled|lower}} {% endif %} {% if metron_keytab_path is defined %}METRON_SERVICE_KEYTAB="{{metron_keytab_path}}" {% endif %} +KAFKA_SECURITY_PROTOCOL="{{kafka_security_protocol}}" +PARSER_TOPOLOGY_OPTIONS="/home/{{metron_user}}/.storm/storm.config" http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/main/config/rest_application.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml b/metron-interface/metron-rest/src/main/config/rest_application.yml index dbb0b43..2afbb8a 100644 --- a/metron-interface/metron-rest/src/main/config/rest_application.yml +++ b/metron-interface/metron-rest/src/main/config/rest_application.yml @@ -28,6 +28,8 @@ zookeeper: kafka: broker: url: ${BROKERLIST} + security: + protocol: ${KAFKA_SECURITY_PROTOCOL} hdfs: namenode: @@ -43,6 +45,7 @@ storm: url: ${STORM_REST_URL} parser: script.path: ${METRON_HOME}/bin/start_parser_topology.sh + topology.options: ${PARSER_TOPOLOGY_OPTIONS} enrichment: script.path: ${METRON_HOME}/bin/start_enrichment_topology.sh indexing: http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java index 2a147e6..dbdcfc4 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java @@ -41,6 +41,8 @@ public class MetronRestConstants { public static final String PARSER_SCRIPT_PATH_SPRING_PROPERTY = "storm.parser.script.path"; public static final String ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY = "storm.enrichment.script.path"; public static final String INDEXING_SCRIPT_PATH_SPRING_PROPERTY = "storm.indexing.script.path"; + public static final String PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY = "storm.parser.topology.options"; + public static final String KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY = "kafka.security.protocol"; public static final String ZK_URL_SPRING_PROPERTY = "zookeeper.url"; public static final String ZK_CLIENT_SESSION_TIMEOUT = "zookeeper.client.timeout.session"; http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java index 1158721..463c925 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java @@ -17,14 +17,19 @@ */ package org.apache.metron.rest.service.impl; +import org.apache.commons.lang3.StringUtils; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,6 +40,8 @@ import static org.apache.metron.rest.MetronRestConstants.INDEXING_TOPOLOGY_NAME; public class StormCLIWrapper { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private Environment environment; @Autowired @@ -75,26 +82,52 @@ public class StormCLIWrapper { protected int runCommand(String[] command) throws RestException { ProcessBuilder pb = getProcessBuilder(command); pb.inheritIO(); - Process process = null; + LOG.debug("Running command: cmd={}", String.join(" ", command)); + + Process process; try { process = pb.start(); process.waitFor(); + } catch (Exception e) { throw new RestException(e); } - return process.exitValue(); + + int exitValue = process.exitValue(); + LOG.debug("Command completed: cmd={}, exit={}", String.join(" ", command), exitValue); + + return exitValue; } protected String[] getParserStartCommand(String name) { - String[] command = new String[7]; - command[0] = environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY); - command[1] = "-k"; - command[2] = environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY); - command[3] = "-z"; - command[4] = environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY); - command[5] = "-s"; - command[6] = name; - return command; + List<String> command = new ArrayList<>(); + command.add( environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY)); + + // sensor type + command.add( "-s"); + command.add( name); + + // zookeeper + command.add( "-z"); + command.add( environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY)); + + // kafka broker + command.add( "-k"); + command.add( environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)); + + // kafka security protocol + command.add( "-ksp"); + command.add( environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)); + + // extra topology options + boolean kerberosEnabled = environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false); + boolean topologyOptionsDefined = StringUtils.isNotBlank(environment.getProperty(MetronRestConstants.PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY)); + if (kerberosEnabled && topologyOptionsDefined) { + command.add("-e"); + command.add(environment.getProperty(MetronRestConstants.PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY)); + } + + return command.toArray(new String[0]); } protected String[] getEnrichmentStartCommand() { http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/main/resources/application-docker.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application-docker.yml b/metron-interface/metron-rest/src/main/resources/application-docker.yml index 5891417..15fa293 100644 --- a/metron-interface/metron-rest/src/main/resources/application-docker.yml +++ b/metron-interface/metron-rest/src/main/resources/application-docker.yml @@ -36,6 +36,8 @@ zookeeper: kafka: broker: url: ${docker.host.address}:9092 + security: + protocol: PLAINTEXT hdfs: namenode: @@ -51,7 +53,8 @@ storm: url: ${docker.host.address}:8080 parser: script.path: /usr/metron/${metron.version}/bin/start_parser_topology.sh + topology.options: ${PARSER_TOPOLOGY_OPTIONS} enrichment: script.path: /usr/metron/${metron.version}/bin/start_enrichment_topology.sh indexing: - script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh + script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/main/resources/application-vagrant.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml index 1884ca3..31b5784 100644 --- a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml +++ b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml @@ -30,6 +30,8 @@ zookeeper: kafka: broker: url: node1:6667 + security: + protocol: PLAINTEXT hdfs: namenode: http://git-wip-us.apache.org/repos/asf/metron/blob/57d2764c/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java index 9fb067a..73d54d8 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java @@ -24,7 +24,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; -import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.core.env.Environment; @@ -76,11 +75,44 @@ public class StormCLIWrapperTest { when(environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)).thenReturn("kafka_broker_url"); when(environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY)).thenReturn("zookeeper_url"); when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false); + when(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)).thenReturn("kafka_security_protocol"); when(process.exitValue()).thenReturn(0); assertEquals(0, stormCLIWrapper.startParserTopology("bro")); verify(process).waitFor(); - verifyNew(ProcessBuilder.class).withArguments("/start_parser", "-k", "kafka_broker_url", "-z", "zookeeper_url", "-s", "bro"); + verifyNew(ProcessBuilder.class).withArguments("/start_parser", + "-s", "bro", + "-z", "zookeeper_url", + "-k", "kafka_broker_url", + "-ksp", "kafka_security_protocol"); + } + + /** + * If Kerberos is enabled and the PARSER_TOPOLOGY_OPTIONS field is defined, then extra topology options + * will be passed to the Parser topology. + */ + @Test + public void startParserTopologyWithExtraTopologyOptions() throws Exception { + + whenNew(ProcessBuilder.class).withParameterTypes(String[].class).withArguments(anyVararg()).thenReturn(processBuilder); + + when(processBuilder.start()).thenReturn(process); + when(environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_parser"); + when(environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)).thenReturn("kafka_broker_url"); + when(environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY)).thenReturn("zookeeper_url"); + when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(true); + when(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)).thenReturn("kafka_security_protocol"); + when(environment.getProperty(MetronRestConstants.PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY)).thenReturn("parser_topology_options"); + when(process.exitValue()).thenReturn(0); + + assertEquals(0, stormCLIWrapper.startParserTopology("bro")); + verify(process, times(2)).waitFor(); + verifyNew(ProcessBuilder.class).withArguments("/start_parser", + "-s", "bro", + "-z", "zookeeper_url", + "-k", "kafka_broker_url", + "-ksp", "kafka_security_protocol", + "-e", "parser_topology_options"); } @Test
