Repository: incubator-metron Updated Branches: refs/heads/master 98dc7659a -> aef84636a
METRON-797: Pass security.protocol and enable auto-renew for the storm topologies Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/dae102b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/dae102b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/dae102b0 Branch: refs/heads/master Commit: dae102b0228b969d4e685a81dd6df25e59f63cb5 Parents: 98dc765 Author: cstella <ceste...@gmail.com> Authored: Wed Mar 29 08:17:52 2017 -0400 Committer: cstella <ceste...@gmail.com> Committed: Wed Mar 29 08:17:52 2017 -0400 ---------------------------------------------------------------------- dependencies_with_url.csv | 5 ++ .../src/main/config/profiler.properties | 6 +- .../src/main/flux/profiler/remote.yaml | 17 ++++- .../integration/ProfilerIntegrationTest.java | 2 + .../METRON/CURRENT/configuration/metron-env.xml | 2 + .../package/templates/enrichment.properties.j2 | 2 + .../roles/metron-builder/tasks/main.yml | 2 +- .../flatfile/importer/MapReduceImporter.java | 2 + .../src/main/config/elasticsearch.properties | 2 + .../src/main/config/enrichment.properties | 3 + .../src/main/flux/enrichment/remote.yaml | 29 +++++++- .../integration/EnrichmentIntegrationTest.java | 2 + metron-platform/metron-hbase/pom.xml | 15 ++++ .../src/main/flux/indexing/remote.yaml | 17 +++++ .../integration/IndexingIntegrationTest.java | 2 + .../parsers/topology/ParserTopologyBuilder.java | 75 +++++++++++++++++--- .../parsers/topology/ParserTopologyCLI.java | 53 +++++++++++--- .../components/ParserTopologyComponent.java | 8 +-- .../parsers/topology/ParserTopologyCLITest.java | 18 ++++- .../src/main/config/pcap.properties | 2 + .../src/main/flux/pcap/remote.yaml | 6 ++ .../PcapTopologyIntegrationTest.java | 2 + .../metron-solr/src/main/config/solr.properties | 2 + .../metron/writer/hdfs/SourceHandler.java | 2 - .../apache/metron/writer/kafka/KafkaWriter.java | 7 +- 25 files changed, 247 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/dependencies_with_url.csv ---------------------------------------------------------------------- diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv index 21f0cb5..25650a3 100644 --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@ -65,6 +65,7 @@ com.sun.jersey:jersey-json:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/ com.sun.jersey:jersey-server:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/ com.thoughtworks.paranamer:paranamer:jar:2.3:compile,BSD,https://github.com/paul-hammant/paranamer javax.servlet.jsp:jsp-api:jar:2.1:runtime,CDDL,http://oracle.com +javax.servlet.jsp:jsp-api:jar:2.1:compile,CDDL,http://oracle.com javax.servlet:servlet-api:jar:2.5:compile,CDDL,http://oracle.com net.jcip:jcip-annotations:jar:1.0:compile,Public,http://jcip.net/ org.codehaus.jettison:jettison:jar:1.1:compile,ASLv2,https://github.com/codehaus/jettison @@ -157,6 +158,7 @@ commons-digester:commons-digester:jar:1.8:compile,The Apache Software License, V commons-digester:commons-digester:jar:2.1:compile,ASLv2,http://commons.apache.org/digester/ commons-el:commons-el:jar:1.0:provided,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/ commons-el:commons-el:jar:1.0:runtime,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/ +commons-el:commons-el:jar:1.0:compile,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/ commons-httpclient:commons-httpclient:jar:3.1:compile,Apache License,http://jakarta.apache.org/httpcomponents/httpclient-3.x/ commons-io:commons-io:jar:2.4:compile,ASLv2,http://commons.apache.org/io/ commons-io:commons-io:jar:2.5:compile,ASLv2,http://commons.apache.org/io/ @@ -288,3 +290,6 @@ com.h2database:h2:jar:1.4.192:compile,EPL 1.0,http://www.h2database.com/html/lic de.jollyday:jollyday:jar:0.5.2:compile,ASLv2,http://jollyday.sourceforge.net/license.html org.threeten:threeten-extra:jar:1.0:compile,BSD,http://www.threeten.org/threeten-extra/license.html org.atteo.classindex:classindex:jar:3.3:compile,ASLv2,https://github.com/atteo/classindex +com.squareup.okhttp:okhttp:jar:2.4.0:compile,ASLv2,https://github.com/square/okhttp +com.squareup.okio:okio:jar:1.4.0:compile,ASLv2,https://github.com/square/okhttp +org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.org/ http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-analytics/metron-profiler/src/main/config/profiler.properties ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties index 860934f..b79ac73 100644 --- a/metron-analytics/metron-profiler/src/main/config/profiler.properties +++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties @@ -20,6 +20,10 @@ ##### Storm ##### +storm.auto.credentials=[] + +##### Profiler ##### + profiler.workers=1 profiler.executors=0 profiler.input.topic=indexing @@ -34,10 +38,10 @@ profiler.hbase.column.family=P profiler.hbase.batch=10 profiler.hbase.flush.interval.seconds=30 - ##### Kafka ##### kafka.zk=node1:2181 kafka.broker=node1:6667 # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST kafka.start=UNCOMMITTED_EARLIEST +kafka.security.protocol=PLAINTEXT http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml index 7ea77a5..0b14bce 100644 --- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml +++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml @@ -17,7 +17,7 @@ name: "profiler" config: - + topology.auto-credentials: ${storm.auto.credentials} topology.workers: ${profiler.workers} topology.acker.executors: ${profiler.executors} @@ -60,6 +60,10 @@ components: args: - "group.id" - "profiler" + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" # The fields to pull out of the kafka messages - id: "fields" @@ -83,6 +87,13 @@ components: args: - "${kafka.start}" + - id: "kafkaWriterProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" - id: "kafkaWriter" className: "org.apache.metron.writer.kafka.KafkaWriter" @@ -91,6 +102,8 @@ components: args: ["${profiler.output.topic}"] - name: "withZkQuorum" args: ["${kafka.zk}"] + - name: "withProducerConfigs" + args: [ref: "kafkaWriterProps"] - id: "kafkaDestinationHandler" className: "org.apache.metron.profiler.bolt.KafkaDestinationHandler" @@ -174,4 +187,4 @@ streams: to: "kafkaBolt" grouping: streamId: "kafka" - type: SHUFFLE \ No newline at end of file + type: SHUFFLE http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java index bca8ed5..7591300 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java @@ -305,6 +305,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { setProperty("profiler.hbase.flush.interval.seconds", "1"); setProperty("profiler.profile.ttl", "20"); setProperty("hbase.provider.impl", "" + MockTableProvider.class.getName()); + setProperty("storm.auto.credentials", "[]"); + setProperty("kafka.security.protocol", "PLAINTEXT"); }}; // create the mock table http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml index 199e708..108e0ba 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml @@ -178,6 +178,8 @@ indexing.executors=0 kafka.zk={{ zookeeper_quorum }} kafka.broker={{ kafka_brokers }} kafka.start=UNCOMMITTED_EARLIEST +kafka.security.protocol=PLAINTEXT +storm.auto.credentials=[] ##### Indexing ##### index.input.topic=indexing index.error.topic=indexing http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 index dc108f7..8fc2335 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 @@ -19,6 +19,8 @@ kafka.zk={{zookeeper_quorum}} kafka.broker={{kafka_brokers}} +kafka.security.protocol=PLAINTEXT +storm.auto.credentials=[] enrichment.output.topic=indexing enrichment.error.topic=enrichments_error threat.intel.error.topic=threatintel_error http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-deployment/roles/metron-builder/tasks/main.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/roles/metron-builder/tasks/main.yml b/metron-deployment/roles/metron-builder/tasks/main.yml index 889eafe..3f4906e 100644 --- a/metron-deployment/roles/metron-builder/tasks/main.yml +++ b/metron-deployment/roles/metron-builder/tasks/main.yml @@ -16,6 +16,6 @@ # --- - name: Build Deployment Artifacts - local_action: shell cd {{ metron_build_dir }} && mvn clean package -DskipTests -P HDP-2.5.0.0,mpack,build-rpms + local_action: shell cd {{ metron_build_dir }} && mvn clean package -DskipTests -T 2C -P HDP-2.5.0.0,mpack,build-rpms become: false run_once: true http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java index e83bdd6..63a84cb 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.Logger; @@ -66,6 +67,7 @@ public enum MapReduceImporter implements Importer{ job.setNumReduceTasks(0); List<Path> paths = inputs.stream().map(p -> new Path(p)).collect(Collectors.toList()); handler.getInputFormat().set(job, paths, handler.getConfig()); + TableMapReduceUtil.initCredentials(job); try { job.waitForCompletion(true); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties index 317742b..d45d3d4 100644 --- a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties +++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties @@ -17,6 +17,7 @@ ##### Storm ##### indexing.workers=1 indexing.executors=0 +storm.auto.credentials=[] ##### Kafka ##### @@ -24,6 +25,7 @@ kafka.zk=node1:2181 kafka.broker=node1:6667 # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST kafka.start=UNCOMMITTED_EARLIEST +kafka.security.protocol=PLAINTEXT ##### Indexing ##### index.input.topic=indexing http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-enrichment/src/main/config/enrichment.properties ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/config/enrichment.properties b/metron-platform/metron-enrichment/src/main/config/enrichment.properties index c905d30..af5b27b 100644 --- a/metron-platform/metron-enrichment/src/main/config/enrichment.properties +++ b/metron-platform/metron-enrichment/src/main/config/enrichment.properties @@ -19,6 +19,9 @@ kafka.zk=node1:2181 kafka.broker=node1:6667 +kafka.security.protocol=PLAINTEXT +storm.auto.credentials=[] + enrichment.output.topic=indexing enrichment.error.topic=indexing threat.intel.error.topic=indexing http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml index 439105f..51fc7ce 100644 --- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml +++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml @@ -18,8 +18,10 @@ name: "enrichment" config: topology.workers: 1 topology.acker.executors: 0 + topology.auto-credentials: ${storm.auto.credentials} components: + # Enrichment - id: "stellarEnrichmentAdapter" className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" @@ -28,6 +30,15 @@ components: args: - "ENRICHMENT" + # Any kafka props for the producer go here. + - id: "kafkaWriterProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + - id: "stellarEnrichment" className: "org.apache.metron.enrichment.configuration.Enrichment" constructorArgs: @@ -100,6 +111,9 @@ components: - name: "withZkQuorum" args: - "${kafka.zk}" + - name: "withProducerConfigs" + args: + - ref: "kafkaWriterProps" # Threat Intel - id: "stellarThreatIntelAdapter" @@ -163,7 +177,9 @@ components: - name: "withZkQuorum" args: - "${kafka.zk}" - + - name: "withProducerConfigs" + args: + - ref: "kafkaWriterProps" #indexing - id: "kafkaWriter" className: "org.apache.metron.writer.kafka.KafkaWriter" @@ -174,9 +190,12 @@ components: - name: "withZkQuorum" args: - "${kafka.zk}" + - name: "withProducerConfigs" + args: + - ref: "kafkaWriterProps" #kafka/zookeeper - # Any kafka props for the producer go here. + # Any kafka props for the consumer go here. - id: "kafkaProps" className: "java.util.HashMap" configMethods: @@ -192,6 +211,11 @@ components: args: - "group.id" - "enrichments" + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + # The fields to pull out of the kafka messages - id: "fields" @@ -299,6 +323,7 @@ bolts: args: - ref: "enrichmentErrorKafkaWriter" + # Threat Intel Bolts - id: "threatIntelSplitBolt" className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt" http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java index e012c55..77b64dc 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java @@ -141,6 +141,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { setProperty("enrichment.simple.hbase.cf", cf); setProperty("enrichment.output.topic", Constants.INDEXING_TOPIC); setProperty("enrichment.error.topic", ERROR_TOPIC); + setProperty("kafka.security.protocol", "PLAINTEXT"); + setProperty("storm.auto.credentials", "[]"); }}; final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{ http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-hbase/pom.xml b/metron-platform/metron-hbase/pom.xml index 36df5a3..4921859 100644 --- a/metron-platform/metron-hbase/pom.xml +++ b/metron-platform/metron-hbase/pom.xml @@ -118,6 +118,21 @@ </dependency> <dependency> <groupId>org.apache.storm</groupId> + <artifactId>storm-hbase</artifactId> + <version>${global_storm_version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${global_storm_version}</version> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml index 3905a7a..ec423c5 100644 --- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml +++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml @@ -15,9 +15,11 @@ # limitations under the License. name: "indexing" + config: topology.workers: ${indexing.workers} topology.acker.executors: ${indexing.executors} + topology.auto-credentials: ${storm.auto.credentials} components: @@ -49,6 +51,15 @@ components: - name: "withRotationPolicy" args: - ref: "hdfsRotationPolicy" + + - id: "kafkaWriterProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + - id: "kafkaWriter" className: "org.apache.metron.writer.kafka.KafkaWriter" configMethods: @@ -58,6 +69,8 @@ components: - name: "withZkQuorum" args: - "${kafka.zk}" + - name: "withProducerConfigs" + args: [ref: "kafkaWriterProps"] - id: "indexWriter" className: "${writer.class.name}" @@ -79,6 +92,10 @@ components: args: - "group.id" - "indexing" + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" # The fields to pull out of the kafka messages - id: "fields" http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index 394fbf0..cc7d7e3 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -121,6 +121,8 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath); final Properties topologyProperties = new Properties() {{ setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); + setProperty("kafka.security.protocol", "PLAINTEXT"); + setProperty("storm.auto.credentials", "[]"); setProperty("indexing.workers", "1"); setProperty("indexing.executors", "0"); setProperty("index.input.topic", Constants.INDEXING_TOPIC); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index b347ca5..e9acbaa 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -64,7 +64,7 @@ public class ParserTopologyBuilder { * @throws Exception */ public static TopologyBuilder build(String zookeeperUrl, - String brokerUrl, + Optional<String> brokerUrl, String sensorType, int spoutParallelism, int spoutNumTasks, @@ -72,7 +72,8 @@ public class ParserTopologyBuilder { int parserNumTasks, int errorWriterParallelism, int errorWriterNumTasks, - Map<String, Object> kafkaSpoutConfig + Map<String, Object> kafkaSpoutConfig, + Optional<String> securityProtocol ) throws Exception { // fetch configuration from zookeeper @@ -81,19 +82,19 @@ public class ParserTopologyBuilder { // create the spout TopologyBuilder builder = new TopologyBuilder(); - KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, Optional.ofNullable(kafkaSpoutConfig) , parserConfig); + KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig) , parserConfig); builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism) .setNumTasks(spoutNumTasks); // create the parser bolt - ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, configs, parserConfig); + ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig); builder.setBolt("parserBolt", parserBolt, parserParallelism) .setNumTasks(parserNumTasks) .shuffleGrouping("kafkaSpout"); // create the error bolt, if needed if (errorWriterNumTasks > 0) { - WriterBolt errorBolt = createErrorBolt(brokerUrl, sensorType, configs, parserConfig); + WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig); builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism) .setNumTasks(errorWriterNumTasks) .shuffleGrouping("parserBolt", Constants.ERROR_STREAM); @@ -111,7 +112,13 @@ public class ParserTopologyBuilder { * @param parserConfig Configuration for the parser * @return */ - private static StormKafkaSpout<Object, Object> createKafkaSpout(String zkQuorum, String sensorType, Optional<Map<String, Object>> kafkaConfigOptional, SensorParserConfig parserConfig) { + private static StormKafkaSpout<Object, Object> createKafkaSpout( String zkQuorum + , String sensorType + , Optional<String> securityProtocol + , Optional<Map<String, Object>> kafkaConfigOptional + , SensorParserConfig parserConfig + ) + { Map<String, Object> kafkaSpoutConfigOptions = kafkaConfigOptional.orElse(new HashMap<>()); String inputTopic = parserConfig.getSensorTopic() != null ? parserConfig.getSensorTopic() : sensorType; kafkaSpoutConfigOptions.putIfAbsent( SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key @@ -120,9 +127,32 @@ public class ParserTopologyBuilder { kafkaSpoutConfigOptions.putIfAbsent( KafkaSpoutConfig.Consumer.GROUP_ID , inputTopic + "_parser" ); + if(securityProtocol.isPresent()) { + kafkaSpoutConfigOptions.putIfAbsent("security.protocol", securityProtocol.get()); + } return SimpleStormKafkaBuilder.create(inputTopic, zkQuorum, Arrays.asList("value"), kafkaSpoutConfigOptions); } + private static KafkaWriter createKafkaWriter( Optional<String> broker + , String zkQuorum + , Optional<String> securityProtocol + ) + { + KafkaWriter ret = null; + if(broker.isPresent()) { + ret = new KafkaWriter(broker.get()); + } + else { + ret = new KafkaWriter().withZkQuorum(zkQuorum); + } + if(securityProtocol.isPresent()) { + HashMap<String, Object> config = new HashMap<>(); + config.put("security.protocol", securityProtocol.get()); + ret.withProducerConfigs(config); + } + return ret; + } + /** * Create a bolt that parses input from a sensor. * @@ -133,7 +163,14 @@ public class ParserTopologyBuilder { * @param parserConfig * @return A Storm bolt that parses input from a sensor */ - private static ParserBolt createParserBolt(String zookeeperUrl, String brokerUrl, String sensorType, ParserConfigurations configs, SensorParserConfig parserConfig) { + private static ParserBolt createParserBolt( String zookeeperUrl + , Optional<String> brokerUrl + , String sensorType + , Optional<String> securityProtocol + , ParserConfigurations configs + , SensorParserConfig parserConfig + ) + { // create message parser MessageParser<JSONObject> parser = ReflectionUtils.createInstance(parserConfig.getParserClassName()); @@ -141,7 +178,10 @@ public class ParserTopologyBuilder { // create writer - if not configured uses a sensible default AbstractWriter writer = parserConfig.getWriterClassName() == null ? - new KafkaWriter(brokerUrl).withTopic(Constants.ENRICHMENT_TOPIC) : + createKafkaWriter( brokerUrl + , zookeeperUrl + , securityProtocol + ).withTopic(Constants.ENRICHMENT_TOPIC) : ReflectionUtils.createInstance(parserConfig.getWriterClassName()); writer.configure(sensorType, new ParserWriterConfiguration(configs)); @@ -154,17 +194,30 @@ public class ParserTopologyBuilder { /** * Create a bolt that handles error messages. * + * @param zookeeperUrl Kafka zookeeper URL * @param brokerUrl Kafka Broker URL * @param sensorType Type of sensor that is being consumed. + * @param securityProtocol Security protocol used (if any) * @param configs * @param parserConfig * @return A Storm bolt that handles error messages. */ - private static WriterBolt createErrorBolt(String brokerUrl, String sensorType, ParserConfigurations configs, SensorParserConfig parserConfig) { + private static WriterBolt createErrorBolt( String zookeeperUrl + , Optional<String> brokerUrl + , String sensorType + , Optional<String> securityProtocol + , ParserConfigurations configs + , SensorParserConfig parserConfig + ) + { // create writer - if not configured uses a sensible default - AbstractWriter writer = parserConfig.getErrorWriterClassName() == null - ? new KafkaWriter(brokerUrl).withTopic((String) configs.getGlobalConfig().get("parser.error.topic")).withConfigPrefix("error") + AbstractWriter writer = parserConfig.getErrorWriterClassName() == null ? + createKafkaWriter( brokerUrl + , zookeeperUrl + , securityProtocol + ).withTopic((String) configs.getGlobalConfig().get("parser.error.topic")) + .withConfigPrefix("error") : ReflectionUtils.createInstance(parserConfig.getWriterClassName()); writer.configure(sensorType, new ParserWriterConfiguration(configs)); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java index 8cf921e..d83146f 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java @@ -17,10 +17,13 @@ */ package org.apache.metron.parsers.topology; +import com.google.common.collect.ImmutableList; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; +import org.apache.storm.hbase.security.AutoHBase; +import org.apache.storm.hdfs.common.security.AutoHDFS; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; import com.fasterxml.jackson.core.type.TypeReference; @@ -33,9 +36,7 @@ import org.apache.metron.parsers.topology.config.ConfigHandlers; import java.io.File; import java.io.IOException; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import java.util.function.Function; public class ParserTopologyCLI { @@ -55,7 +56,7 @@ public class ParserTopologyCLI { BROKER_URL("k", code -> { Option o = new Option(code, "kafka", true, "Kafka Broker URL"); o.setArgName("BROKER_URL"); - o.setRequired(true); + o.setRequired(false); return o; }), SENSOR_TYPE("s", code -> { @@ -176,6 +177,18 @@ public class ParserTopologyCLI { return o; } ) + ,SECURITY_PROTOCOL("ksp", code -> { + Option o = new Option(code + , "kafka_security_protocol" + , true + , "The kafka security protocol to use (if running with a kerberized cluster). E.g. PLAINTEXTSASL" + ); + o.setArgName("SECURITY_PROTOCOL"); + o.setRequired(false); + o.setType(String.class); + return o; + } + ) ,TEST("t", code -> { Option o = new Option("t", "test", true, "Run in Test Mode"); @@ -270,7 +283,7 @@ public class ParserTopologyCLI { System.exit(0); } String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);; - String brokerUrl = ParserOptions.BROKER_URL.get(cmd); + Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty(); String sensorType= ParserOptions.SENSOR_TYPE.get(cmd); int spoutParallelism = Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1")); int spoutNumTasks = Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1")); @@ -284,7 +297,8 @@ public class ParserTopologyCLI { if(ParserOptions.SPOUT_CONFIG.has(cmd)) { spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd))); } - + Optional<String> securityProtocol = ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty(); + securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig); TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl, brokerUrl, sensorType, @@ -294,10 +308,18 @@ public class ParserTopologyCLI { parserNumTasks, errorParallelism, errorNumTasks, - spoutConfig + spoutConfig, + securityProtocol ); Config stormConf = ParserOptions.getConfig(cmd); - + if(securityProtocol.isPresent() && !stormConf.containsKey(Config.TOPOLOGY_AUTO_CREDENTIALS)) { + //if I'm specifying it already, then I won't impose autohdfs and autohbase + List<String> autoCredentials = new ArrayList<>(); + for (String credential : ImmutableList.of(AutoHDFS.class.getName(), AutoHBase.class.getName())) { + autoCredentials.add(credential); + } + stormConf.put( Config.TOPOLOGY_AUTO_CREDENTIALS , autoCredentials ); + } if (ParserOptions.TEST.has(cmd)) { stormConf.put(Config.TOPOLOGY_DEBUG, true); LocalCluster cluster = new LocalCluster(); @@ -312,6 +334,21 @@ public class ParserTopologyCLI { System.exit(-1); } } + + private static Optional<String> getSecurityProtocol(Optional<String> protocol, Map<String, Object> spoutConfig) { + Optional<String> ret = protocol; + if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) { + ret = Optional.empty(); + } + if(!ret.isPresent()) { + ret = Optional.ofNullable((String) spoutConfig.get("security.protocol")); + } + if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) { + ret = Optional.empty(); + } + return ret; + } + private static Map<String, Object> readSpoutConfig(File inputFile) { String json = null; if (inputFile.exists()) { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java index 48bcbec..b6a76d0 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java @@ -33,10 +33,7 @@ import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; +import java.util.*; import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots; import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir; @@ -82,7 +79,7 @@ public class ParserTopologyComponent implements InMemoryComponent { public void start() throws UnableToStartException { try { TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty("kafka.zk") - , brokerUrl + , Optional.ofNullable(brokerUrl) , sensorType , 1 , 1 @@ -91,6 +88,7 @@ public class ParserTopologyComponent implements InMemoryComponent { , 1 , 1 , null + , Optional.empty() ); Map<String, Object> stormConf = new HashMap<>(); stormConf.put(Config.TOPOLOGY_DEBUG, true); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java index 5e70177..ac73a2b 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java @@ -34,9 +34,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumMap; -import java.util.Map; +import java.util.*; public class ParserTopologyCLITest { @@ -75,6 +73,20 @@ public class ParserTopologyCLITest { } } + @Test + public void testNoOverlappingArgs() throws Exception { + Set<String> optionStrs = new HashSet<>(); + for(ParserTopologyCLI.ParserOptions option : ParserTopologyCLI.ParserOptions.values()) { + if(optionStrs.contains(option.option.getLongOpt())) { + throw new IllegalStateException("Reused long option: " + option.option.getLongOpt()); + } + if(optionStrs.contains(option.shortCode)) { + throw new IllegalStateException("Reused short option: " + option.shortCode); + } + optionStrs.add(option.option.getLongOpt()); + optionStrs.add(option.shortCode); + } + } @Test public void testKafkaOffset_happyPath() throws ParseException { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-pcap-backend/src/main/config/pcap.properties ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties index 48810c5..6e51dc5 100644 --- a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties +++ b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties @@ -15,7 +15,9 @@ # limitations under the License. spout.kafka.topic.pcap=pcap +storm.auto.credentials=[] kafka.zk=node1:2181 +kafka.security.protocol=PLAINTEXT kafka.pcap.start=END kafka.pcap.numPackets=1000 kafka.pcap.maxTimeMS=300000 http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml index 732991b..2b7e0fd 100644 --- a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml +++ b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml @@ -17,6 +17,7 @@ name: "pcap" config: topology.workers: 1 + topology.auto-credentials: ${storm.auto.credentials} components: @@ -36,6 +37,11 @@ components: args: - "group.id" - "pcap" + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + - id: "kafkaConfig" className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" constructorArgs: http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 8b292d7..84e7574 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -212,6 +212,8 @@ public class PcapTopologyIntegrationTest { setProperty("kafka.pcap.numPackets", "2"); setProperty("kafka.pcap.maxTimeMS", "200000000"); setProperty("kafka.pcap.ts_granularity", "NANOSECONDS"); + setProperty("storm.auto.credentials", "[]"); + setProperty("kafka.security.protocol", "PLAINTEXT"); }}; updatePropertiesCallback.apply(topologyProperties); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-solr/src/main/config/solr.properties ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/config/solr.properties b/metron-platform/metron-solr/src/main/config/solr.properties index 35f368c..914832d 100644 --- a/metron-platform/metron-solr/src/main/config/solr.properties +++ b/metron-platform/metron-solr/src/main/config/solr.properties @@ -17,6 +17,7 @@ ##### Storm ##### indexing.workers=1 indexing.executors=0 +storm.auto.credentials=[] ##### Kafka ##### @@ -24,6 +25,7 @@ kafka.zk=node1:2181 kafka.broker=node1:6667 # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST kafka.start=UNCOMMITTED_EARLIEST +kafka.security.protocol=PLAINTEXT ##### Indexing ##### index.input.topic=indexing http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java index f03ac41..ba3f96c 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java @@ -92,9 +92,7 @@ public class SourceHandler { } private void initialize(Map config) throws IOException { - Configuration hdfsConfig = new Configuration(); this.fs = FileSystem.get(new Configuration()); - HdfsSecurityUtil.login(config, hdfsConfig); this.currentFile = createOutputFile(); if(this.rotationPolicy instanceof TimedRotationPolicy){ long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval(); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java index 5c00e52..1884f5d 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java @@ -105,7 +105,12 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj } public KafkaWriter withProducerConfigs(Map<String, Object> extraConfigs) { - this.producerConfigs = extraConfigs; + if(producerConfigs == null) { + this.producerConfigs = extraConfigs; + } + else if(extraConfigs != null){ + producerConfigs.putAll(extraConfigs); + } return this; }