Repository: metron Updated Branches: refs/heads/master 5c0ac32d1 -> badc6cf97
METRON-1018 Integration tests should reference flux yaml and property files deployed by Ambari (merrimanr) closes apache/metron#635 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/badc6cf9 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/badc6cf9 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/badc6cf9 Branch: refs/heads/master Commit: badc6cf9739a31800abee1cfccbcf9930b130fa7 Parents: 5c0ac32 Author: merrimanr <[email protected]> Authored: Wed Jul 26 08:24:51 2017 -0500 Committer: merrimanr <[email protected]> Committed: Wed Jul 26 08:24:51 2017 -0500 ---------------------------------------------------------------------- metron-deployment/packaging/ambari/.gitignore | 2 + .../packaging/ambari/metron-mpack/pom.xml | 41 ++ .../package/scripts/params/params_linux.py | 1 + .../templates/elasticsearch.properties.j2 | 49 -- .../package/templates/enrichment.properties.j2 | 63 -- .../docker/rpm-docker/SPECS/metron.spec | 1 - .../src/main/assembly/assembly.xml | 1 + .../src/main/config/elasticsearch.properties.j2 | 49 ++ .../ElasticsearchIndexingIntegrationTest.java | 7 +- .../src/main/assembly/assembly.xml | 1 + .../src/main/config/enrichment.properties.j2 | 63 ++ .../src/main/flux/enrichment/test.yaml | 604 ------------------- .../enrichment/bolt/GenericEnrichmentBolt.java | 11 +- .../enrichment/bolt/ErrorEnrichmentBolt.java | 46 -- .../integration/EnrichmentIntegrationTest.java | 75 +-- .../integration/IndexingIntegrationTest.java | 34 +- .../main/config/zookeeper/enrichments/test.json | 2 + .../metron/integration/BaseIntegrationTest.java | 7 +- .../components/FluxTopologyComponent.java | 47 +- .../integration/components/KafkaComponent.java | 2 +- .../components/ZKServerComponent.java | 2 +- .../components/ParserTopologyComponent.java | 3 +- .../PcapTopologyIntegrationTest.java | 15 +- .../metron-solr/src/main/assembly/assembly.xml | 1 + .../src/main/config/solr.properties.j2 | 49 ++ .../SolrIndexingIntegrationTest.java | 7 +- .../test/bolt/BaseEnrichmentBoltTest.java | 1 + 27 files changed, 338 insertions(+), 846 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/ambari/.gitignore ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/.gitignore b/metron-deployment/packaging/ambari/.gitignore index e708548..10c9004 100644 --- a/metron-deployment/packaging/ambari/.gitignore +++ b/metron-deployment/packaging/ambari/.gitignore @@ -1,2 +1,4 @@ archive.zip *.hash +elasticsearch.properties.j2 +enrichment.properties.j2 http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/ambari/metron-mpack/pom.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/pom.xml b/metron-deployment/packaging/ambari/metron-mpack/pom.xml index 09a5da0..ae721f2 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/pom.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/pom.xml @@ -98,6 +98,32 @@ </resources> </configuration> </execution> + <execution> + <id>copy-property-templates</id> + <phase>prepare-package</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${basedir}/src/main/resources/common-services/METRON/CURRENT/package/templates</outputDirectory> + <resources> + <resource> + <directory>${basedir}/../../../../metron-platform/metron-enrichment/src/main/config</directory> + <includes> + <include>enrichment.properties.j2</include> + </includes> + <filtering>false</filtering> + </resource> + <resource> + <directory>${basedir}/../../../../metron-platform/metron-elasticsearch/src/main/config</directory> + <includes> + <include>elasticsearch.properties.j2</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </configuration> + </execution> </executions> </plugin> <plugin> @@ -129,6 +155,21 @@ </execution> </executions> </plugin> + <plugin> + <artifactId>maven-clean-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <filesets> + <fileset> + <directory>${basedir}/src/main/resources/common-services/METRON/CURRENT/package/templates</directory> + <includes> + <include>enrichment.properties.j2</include> + <include>elasticsearch.properties.j2</include> + </includes> + </fileset> + </filesets> + </configuration> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index ee9542c..3f84ef5 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -164,6 +164,7 @@ HdfsResource = functools.partial( ) # HBase +enrichment_hbase_provider_impl = 'org.apache.metron.hbase.HTableProvider' enrichment_table = status_params.enrichment_table enrichment_cf = status_params.enrichment_cf threatintel_table = status_params.threatintel_table http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/elasticsearch.properties.j2 ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/elasticsearch.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/elasticsearch.properties.j2 deleted file mode 100644 index acb0f59..0000000 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/elasticsearch.properties.j2 +++ /dev/null @@ -1,49 +0,0 @@ -{# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#} - -##### Storm ##### -indexing.workers={{indexing_workers}} -indexing.acker.executors={{indexing_acker_executors}} -topology.worker.childopts={{indexing_topology_worker_childopts}} -topology.auto-credentials={{topology_auto_credentials}} -topology.max.spout.pending={{indexing_topology_max_spout_pending}} - -##### Kafka ##### -kafka.zk={{zookeeper_quorum}} -kafka.broker={{kafka_brokers}} -kafka.security.protocol={{kafka_security_protocol}} - -# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST -kafka.start={{indexing_kafka_start}} - -indexing.input.topic={{indexing_input_topic}} -indexing.error.topic={{indexing_error_topic}} - -##### Indexing ##### -indexing.writer.class.name={{indexing_writer_class_name}} - -##### HDFS ##### -bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}} -bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}} -bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}} -indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}} - -##### Parallelism ##### -kafka.spout.parallelism={{indexing_kafka_spout_parallelism}} -indexing.writer.parallelism={{indexing_writer_parallelism}} -hdfs.writer.parallelism={{hdfs_writer_parallelism}} http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 deleted file mode 100755 index 485b938..0000000 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 +++ /dev/null @@ -1,63 +0,0 @@ -{# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#} - -##### Storm ##### -enrichment.workers={{enrichment_workers}} -enrichment.acker.executors={{enrichment_acker_executors}} -topology.worker.childopts={{enrichment_topology_worker_childopts}} -topology.auto-credentials={{topology_auto_credentials}} -topology.max.spout.pending={{enrichment_topology_max_spout_pending}} - -##### Kafka ##### -kafka.zk={{zookeeper_quorum}} -kafka.broker={{kafka_brokers}} -kafka.security.protocol={{kafka_security_protocol}} - -# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST -kafka.start={{enrichment_kafka_start}} - -enrichment.input.topic={{enrichment_input_topic}} -enrichment.output.topic={{enrichment_output_topic}} -enrichment.error.topic={{enrichment_error_topic}} -threat.intel.error.topic={{threatintel_error_topic}} - -##### JoinBolt ##### -enrichment.join.cache.size={{enrichment_join_cache_size}} -threat.intel.join.cache.size={{threatintel_join_cache_size}} - -##### Enrichment ##### -hbase.provider.impl=org.apache.metron.hbase.HTableProvider -enrichment.simple.hbase.table={{enrichment_table}} -enrichment.simple.hbase.cf={{enrichment_cf}} -enrichment.host.known_hosts={{enrichment_host_known_hosts}} - -##### Threat Intel ##### -threat.intel.tracker.table={{threatintel_table}} -threat.intel.tracker.cf={{threatintel_cf}} -threat.intel.simple.hbase.table={{threatintel_table}} -threat.intel.simple.hbase.cf={{threatintel_cf}} - -##### Parallelism ##### -kafka.spout.parallelism={{enrichment_kafka_spout_parallelism}} -enrichment.split.parallelism={{enrichment_split_parallelism}} -enrichment.stellar.parallelism={{enrichment_stellar_parallelism}} -enrichment.join.parallelism={{enrichment_join_parallelism}} -threat.intel.split.parallelism={{threat_intel_split_parallelism}} -threat.intel.stellar.parallelism={{threat_intel_stellar_parallelism}} -threat.intel.join.parallelism={{threat_intel_join_parallelism}} -kafka.writer.parallelism={{kafka_writer_parallelism}} http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec index e34c240..94c7e05 100644 --- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec +++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec @@ -245,7 +245,6 @@ This package installs the Metron Enrichment files %{metron_home}/config/zookeeper/enrichments/yaf.json %{metron_home}/config/zookeeper/enrichments/asa.json %{metron_home}/flux/enrichment/remote.yaml -%exclude %{metron_home}/flux/enrichment/test.yaml %attr(0644,root,root) %{metron_home}/lib/metron-enrichment-%{full_version}-uber.jar # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-elasticsearch/src/main/assembly/assembly.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/assembly/assembly.xml b/metron-platform/metron-elasticsearch/src/main/assembly/assembly.xml index 27d5d0b..1535967 100644 --- a/metron-platform/metron-elasticsearch/src/main/assembly/assembly.xml +++ b/metron-platform/metron-elasticsearch/src/main/assembly/assembly.xml @@ -25,6 +25,7 @@ <excludes> <exclude>**/*.formatted</exclude> <exclude>**/*.filtered</exclude> + <exclude>**/*.j2</exclude> </excludes> <fileMode>0644</fileMode> <lineEnding>unix</lineEnding> http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2 ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2 b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2 new file mode 100644 index 0000000..acb0f59 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2 @@ -0,0 +1,49 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +##### Storm ##### +indexing.workers={{indexing_workers}} +indexing.acker.executors={{indexing_acker_executors}} +topology.worker.childopts={{indexing_topology_worker_childopts}} +topology.auto-credentials={{topology_auto_credentials}} +topology.max.spout.pending={{indexing_topology_max_spout_pending}} + +##### Kafka ##### +kafka.zk={{zookeeper_quorum}} +kafka.broker={{kafka_brokers}} +kafka.security.protocol={{kafka_security_protocol}} + +# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST +kafka.start={{indexing_kafka_start}} + +indexing.input.topic={{indexing_input_topic}} +indexing.error.topic={{indexing_error_topic}} + +##### Indexing ##### +indexing.writer.class.name={{indexing_writer_class_name}} + +##### HDFS ##### +bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}} +bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}} +bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}} +indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}} + +##### Parallelism ##### +kafka.spout.parallelism={{indexing_kafka_spout_parallelism}} +indexing.writer.parallelism={{indexing_writer_parallelism}} +hdfs.writer.parallelism={{hdfs_writer_parallelism}} http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index 54e494e..4c03526 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -102,11 +102,16 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes topologyProperties.setProperty("es.clustername", "metron"); topologyProperties.setProperty("es.port", "9300"); topologyProperties.setProperty("es.ip", "localhost"); - topologyProperties.setProperty("indexing.writer.class.name", "org.apache.metron.elasticsearch.writer.ElasticsearchWriter"); + topologyProperties.setProperty("indexing_writer_class_name", "org.apache.metron.elasticsearch.writer.ElasticsearchWriter"); } @Override public String cleanField(String field) { return field; } + + @Override + public String getTemplatePath() { + return "../metron-elasticsearch/src/main/config/elasticsearch.properties.j2"; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/main/assembly/assembly.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/assembly/assembly.xml b/metron-platform/metron-enrichment/src/main/assembly/assembly.xml index b412ed8..6cbe4e7 100644 --- a/metron-platform/metron-enrichment/src/main/assembly/assembly.xml +++ b/metron-platform/metron-enrichment/src/main/assembly/assembly.xml @@ -25,6 +25,7 @@ <excludes> <exclude>**/*.formatted</exclude> <exclude>**/*.filtered</exclude> + <exclude>**/*.j2</exclude> </excludes> <fileMode>0644</fileMode> <lineEnding>unix</lineEnding> http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/main/config/enrichment.properties.j2 ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/config/enrichment.properties.j2 b/metron-platform/metron-enrichment/src/main/config/enrichment.properties.j2 new file mode 100755 index 0000000..f8b9b66 --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/config/enrichment.properties.j2 @@ -0,0 +1,63 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +##### Storm ##### +enrichment.workers={{enrichment_workers}} +enrichment.acker.executors={{enrichment_acker_executors}} +topology.worker.childopts={{enrichment_topology_worker_childopts}} +topology.auto-credentials={{topology_auto_credentials}} +topology.max.spout.pending={{enrichment_topology_max_spout_pending}} + +##### Kafka ##### +kafka.zk={{zookeeper_quorum}} +kafka.broker={{kafka_brokers}} +kafka.security.protocol={{kafka_security_protocol}} + +# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST +kafka.start={{enrichment_kafka_start}} + +enrichment.input.topic={{enrichment_input_topic}} +enrichment.output.topic={{enrichment_output_topic}} +enrichment.error.topic={{enrichment_error_topic}} +threat.intel.error.topic={{threatintel_error_topic}} + +##### JoinBolt ##### +enrichment.join.cache.size={{enrichment_join_cache_size}} +threat.intel.join.cache.size={{threatintel_join_cache_size}} + +##### Enrichment ##### +hbase.provider.impl={{enrichment_hbase_provider_impl}} +enrichment.simple.hbase.table={{enrichment_table}} +enrichment.simple.hbase.cf={{enrichment_cf}} +enrichment.host.known_hosts={{enrichment_host_known_hosts}} + +##### Threat Intel ##### +threat.intel.tracker.table={{threatintel_table}} +threat.intel.tracker.cf={{threatintel_cf}} +threat.intel.simple.hbase.table={{threatintel_table}} +threat.intel.simple.hbase.cf={{threatintel_cf}} + +##### Parallelism ##### +kafka.spout.parallelism={{enrichment_kafka_spout_parallelism}} +enrichment.split.parallelism={{enrichment_split_parallelism}} +enrichment.stellar.parallelism={{enrichment_stellar_parallelism}} +enrichment.join.parallelism={{enrichment_join_parallelism}} +threat.intel.split.parallelism={{threat_intel_split_parallelism}} +threat.intel.stellar.parallelism={{threat_intel_stellar_parallelism}} +threat.intel.join.parallelism={{threat_intel_join_parallelism}} +kafka.writer.parallelism={{kafka_writer_parallelism}} http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml deleted file mode 100644 index b4481ff..0000000 --- a/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml +++ /dev/null @@ -1,604 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name: "enrichment" -config: - topology.workers: ${enrichment.workers} - topology.acker.executors: ${enrichment.acker.executors} - topology.worker.childopts: ${topology.worker.childopts} - topology.auto-credentials: ${topology.auto-credentials} - topology.max.spout.pending: ${topology.max.spout.pending} - -components: - -# Enrichment - - id: "stellarEnrichmentAdapter" - className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" - configMethods: - - name: "ofType" - args: - - "ENRICHMENT" - - - id: "stellarEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "stellar" - - ref: "stellarEnrichmentAdapter" - - - id: "geoEnrichmentAdapter" - className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter" - - id: "geoEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "geo" - - ref: "geoEnrichmentAdapter" - - id: "hostEnrichmentAdapter" - className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter" - constructorArgs: - - '${enrichment.host.known_hosts}' - - id: "hostEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "host" - - ref: "hostEnrichmentAdapter" - - - id: "simpleHBaseEnrichmentConfig" - className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig" - configMethods: - - name: "withProviderImpl" - args: - - "${hbase.provider.impl}" - - name: "withHBaseTable" - args: - - "${enrichment.simple.hbase.table}" - - name: "withHBaseCF" - args: - - "${enrichment.simple.hbase.cf}" - - id: "simpleHBaseEnrichmentAdapter" - className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter" - configMethods: - - name: "withConfig" - args: - - ref: "simpleHBaseEnrichmentConfig" - - id: "simpleHBaseEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "hbaseEnrichment" - - ref: "simpleHBaseEnrichmentAdapter" - - id: "enrichments" - className: "java.util.ArrayList" - configMethods: - - name: "add" - args: - - ref: "geoEnrichment" - - name: "add" - args: - - ref: "hostEnrichment" - - name: "add" - args: - - ref: "simpleHBaseEnrichment" - - name: "add" - args: - - ref: "stellarEnrichment" - - #enrichment error - - id: "enrichmentErrorKafkaWriter" - className: "org.apache.metron.writer.kafka.KafkaWriter" - configMethods: - - name: "withTopic" - args: - - "${enrichment.error.topic}" - - name: "withZkQuorum" - args: - - "${kafka.zk}" - -# Threat Intel - - id: "stellarThreatIntelAdapter" - className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" - configMethods: - - name: "ofType" - args: - - "THREAT_INTEL" - - id: "stellarThreatIntelEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "stellar" - - ref: "stellarThreatIntelAdapter" - - id: "simpleHBaseThreatIntelConfig" - className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig" - configMethods: - - name: "withProviderImpl" - args: - - "${hbase.provider.impl}" - - name: "withTrackerHBaseTable" - args: - - "${threat.intel.tracker.table}" - - name: "withTrackerHBaseCF" - args: - - "${threat.intel.tracker.cf}" - - name: "withHBaseTable" - args: - - "${threat.intel.simple.hbase.table}" - - name: "withHBaseCF" - args: - - "${threat.intel.simple.hbase.cf}" - - id: "simpleHBaseThreatIntelAdapter" - className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter" - configMethods: - - name: "withConfig" - args: - - ref: "simpleHBaseThreatIntelConfig" - - id: "simpleHBaseThreatIntelEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "hbaseThreatIntel" - - ref: "simpleHBaseThreatIntelAdapter" - - - id: "threatIntels" - className: "java.util.ArrayList" - configMethods: - - name: "add" - args: - - ref: "simpleHBaseThreatIntelEnrichment" - - name: "add" - args: - - ref: "stellarThreatIntelEnrichment" - - #threatintel error - - id: "threatIntelErrorKafkaWriter" - className: "org.apache.metron.writer.kafka.KafkaWriter" - configMethods: - - name: "withTopic" - args: - - "${threat.intel.error.topic}" - - name: "withZkQuorum" - args: - - "${kafka.zk}" - -#indexing - - id: "kafkaWriter" - className: "org.apache.metron.writer.kafka.KafkaWriter" - configMethods: - - name: "withTopic" - args: - - "${enrichment.output.topic}" - - name: "withZkQuorum" - args: - - "${kafka.zk}" - -#kafka/zookeeper -# Any kafka props for the producer go here. - - id: "kafkaProps" - className: "java.util.HashMap" - configMethods: - - name: "put" - args: - - "value.deserializer" - - "org.apache.kafka.common.serialization.ByteArrayDeserializer" - - name: "put" - args: - - "key.deserializer" - - "org.apache.kafka.common.serialization.ByteArrayDeserializer" - - name: "put" - args: - - "group.id" - - "enrichments" - - name: "put" - args: - - "security.protocol" - - "${kafka.security.protocol}" - - - # The fields to pull out of the kafka messages - - id: "fields" - className: "java.util.ArrayList" - configMethods: - - name: "add" - args: - - "value" - - - id: "kafkaConfig" - className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" - constructorArgs: - - ref: "kafkaProps" - # topic name - - "${enrichment.input.topic}" - - "${kafka.zk}" - - ref: "fields" - configMethods: - - name: "setFirstPollOffsetStrategy" - args: - - "${kafka.start}" - - -spouts: - - id: "kafkaSpout" - className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" - constructorArgs: - - ref: "kafkaConfig" - parallelism: ${kafka.spout.parallelism} - -bolts: -# Enrichment Bolts - - id: "enrichmentSplitBolt" - className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichments" - args: - - ref: "enrichments" - parallelism: ${enrichment.split.parallelism} - - - id: "geoEnrichmentBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "geoEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - - - id: "stellarEnrichmentBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "stellarEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - parallelism: ${enrichment.stellar.parallelism} - - - id: "hostEnrichmentBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "hostEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - - - id: "simpleHBaseEnrichmentBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "simpleHBaseEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - - id: "ErrorEnrichmentBolt" - className: "org.apache.metron.enrichment.bolt.ErrorEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "geoEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - - id: "enrichmentJoinBolt" - className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMaxCacheSize" - args: [${enrichment.join.cache.size}] - - name: "withMaxTimeRetain" - args: [10] - parallelism: ${enrichment.join.parallelism} - - - id: "enrichmentErrorOutputBolt" - className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMessageWriter" - args: - - ref: "enrichmentErrorKafkaWriter" - - -# Threat Intel Bolts - - id: "threatIntelSplitBolt" - className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichments" - args: - - ref: "threatIntels" - - name: "withMessageFieldName" - args: ["message"] - parallelism: ${threat.intel.split.parallelism} - - - id: "simpleHBaseThreatIntelBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "simpleHBaseThreatIntelEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - - id: "stellarThreatIntelBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "stellarThreatIntelEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - parallelism: ${threat.intel.stellar.parallelism} - - - id: "threatIntelJoinBolt" - className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMaxCacheSize" - args: [${threat.intel.join.cache.size}] - - name: "withMaxTimeRetain" - args: [10] - parallelism: ${threat.intel.join.parallelism} - - - id: "threatIntelErrorOutputBolt" - className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMessageWriter" - args: - - ref: "threatIntelErrorKafkaWriter" - -# Indexing Bolts - - id: "outputBolt" - className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMessageWriter" - args: - - ref: "kafkaWriter" - parallelism: ${kafka.writer.parallelism} - - -streams: -#parser - - name: "spout -> enrichmentSplit" - from: "kafkaSpout" - to: "enrichmentSplitBolt" - grouping: - type: SHUFFLE - -#enrichment - - name: "enrichmentSplit -> host" - from: "enrichmentSplitBolt" - to: "hostEnrichmentBolt" - grouping: - streamId: "host" - type: FIELDS - args: ["key"] - - name: "enrichmentSplit -> geo" - from: "enrichmentSplitBolt" - to: "geoEnrichmentBolt" - grouping: - streamId: "geo" - type: FIELDS - args: ["key"] - - - name: "enrichmentSplit -> stellar" - from: "enrichmentSplitBolt" - to: "stellarEnrichmentBolt" - grouping: - streamId: "stellar" - type: FIELDS - args: ["key"] - - - - name: "enrichmentSplit -> simpleHBaseEnrichmentBolt" - from: "enrichmentSplitBolt" - to: "simpleHBaseEnrichmentBolt" - grouping: - streamId: "hbaseEnrichment" - type: FIELDS - args: ["key"] - - - name: "enrichmentSplit -> ErrorEnrichmentBolt" - from: "enrichmentSplitBolt" - to: "ErrorEnrichmentBolt" - grouping: - streamId: "hbaseEnrichment" - type: FIELDS - args: ["key"] - - - name: "splitter -> join" - from: "enrichmentSplitBolt" - to: "enrichmentJoinBolt" - grouping: - streamId: "message" - type: FIELDS - args: ["key"] - - name: "geo -> join" - from: "geoEnrichmentBolt" - to: "enrichmentJoinBolt" - grouping: - streamId: "geo" - type: FIELDS - args: ["key"] - - name: "stellar -> join" - from: "stellarEnrichmentBolt" - to: "enrichmentJoinBolt" - grouping: - streamId: "stellar" - type: FIELDS - args: ["key"] - - - - - name: "simpleHBaseEnrichmentBolt -> join" - from: "simpleHBaseEnrichmentBolt" - to: "enrichmentJoinBolt" - grouping: - streamId: "hbaseEnrichment" - type: FIELDS - args: ["key"] - - name: "host -> join" - from: "hostEnrichmentBolt" - to: "enrichmentJoinBolt" - grouping: - streamId: "host" - type: FIELDS - args: ["key"] - - # Error output - - name: "geoEnrichmentBolt -> enrichmentErrorOutputBolt" - from: "geoEnrichmentBolt" - to: "enrichmentErrorOutputBolt" - grouping: - streamId: "error" - type: FIELDS - args: ["message"] - - - name: "stellarEnrichmentBolt -> enrichmentErrorOutputBolt" - from: "stellarEnrichmentBolt" - to: "enrichmentErrorOutputBolt" - grouping: - streamId: "error" - type: FIELDS - args: ["message"] - - - name: "hostEnrichmentBolt -> enrichmentErrorOutputBolt" - from: "hostEnrichmentBolt" - to: "enrichmentErrorOutputBolt" - grouping: - streamId: "error" - type: FIELDS - args: ["message"] - - - name: "simpleHBaseEnrichmentBolt -> enrichmentErrorOutputBolt" - from: "simpleHBaseEnrichmentBolt" - to: "enrichmentErrorOutputBolt" - grouping: - streamId: "error" - type: FIELDS - args: ["message"] - - - name: "ErrorEnrichmentBolt -> enrichmentErrorOutputBolt" - from: "ErrorEnrichmentBolt" - to: "enrichmentErrorOutputBolt" - grouping: - streamId: "error" - type: FIELDS - args: ["message"] - -#threat intel - - name: "enrichmentJoin -> threatSplit" - from: "enrichmentJoinBolt" - to: "threatIntelSplitBolt" - grouping: - streamId: "message" - type: FIELDS - args: ["key"] - - - name: "threatSplit -> simpleHBaseThreatIntel" - from: "threatIntelSplitBolt" - to: "simpleHBaseThreatIntelBolt" - grouping: - streamId: "hbaseThreatIntel" - type: FIELDS - args: ["key"] - - name: "threatSplit -> stellarThreatIntel" - from: "threatIntelSplitBolt" - to: "stellarThreatIntelBolt" - grouping: - streamId: "stellar" - type: FIELDS - args: ["key"] - - - - name: "simpleHBaseThreatIntel -> join" - from: "simpleHBaseThreatIntelBolt" - to: "threatIntelJoinBolt" - grouping: - streamId: "hbaseThreatIntel" - type: FIELDS - args: ["key"] - - - name: "stellarThreatIntel -> join" - from: "stellarThreatIntelBolt" - to: "threatIntelJoinBolt" - grouping: - streamId: "stellar" - type: FIELDS - args: ["key"] - - - name: "threatIntelSplit -> threatIntelJoin" - from: "threatIntelSplitBolt" - to: "threatIntelJoinBolt" - grouping: - streamId: "message" - type: FIELDS - args: ["key"] -#output - - name: "threatIntelJoin -> output" - from: "threatIntelJoinBolt" - to: "outputBolt" - grouping: - streamId: "message" - type: FIELDS - args: ["key"] - - # Error output - - name: "simpleHBaseThreatIntelBolt -> threatIntelErrorOutputBolt" - from: "simpleHBaseThreatIntelBolt" - to: "threatIntelErrorOutputBolt" - grouping: - streamId: "error" - type: FIELDS - args: ["message"] - - - name: "stellarThreatIntelBolt -> threatIntelErrorOutputBolt" - from: "stellarThreatIntelBolt" - to: "threatIntelErrorOutputBolt" - grouping: - streamId: "error" - type: FIELDS - args: ["message"] - http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java index 8b6fee0..907b309 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java @@ -197,7 +197,6 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { else { throw new RuntimeException("Source type is missing from enrichment fragment: " + rawMessage.toJSONString()); } - boolean error = false; String prefix = null; for (Object o : rawMessage.keySet()) { String field = (String) o; @@ -210,7 +209,11 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType); if(config == null) { LOG.error("Unable to find SensorEnrichmentConfig for sourceType: " + sourceType); - error = true; + MetronError metronError = new MetronError() + .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR) + .withMessage("Unable to find SensorEnrichmentConfig for sourceType: " + sourceType) + .addRawMessage(rawMessage); + ErrorUtils.handleError(collector, metronError); continue; } config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF, stellarContext); @@ -226,7 +229,6 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { } catch(Exception e) { LOG.error(e.getMessage(), e); - error = true; MetronError metronError = new MetronError() .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR) .withThrowable(e) @@ -250,9 +252,6 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { } enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis()); - if(error) { - throw new Exception("Unable to enrich " + rawMessage + " check logs for specifics."); - } if (!enrichedMessage.isEmpty()) { collector.emit(enrichmentType, new Values(key, enrichedMessage, subGroup)); } http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ErrorEnrichmentBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ErrorEnrichmentBolt.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ErrorEnrichmentBolt.java deleted file mode 100644 index 571079c..0000000 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ErrorEnrichmentBolt.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.enrichment.bolt; - -import org.json.simple.JSONObject; -import org.apache.storm.tuple.Tuple; - -/** - * Exists in order to provide a bolt that tests that when the GenericEnrichmentBolt writes to error, that it actually carries - * through the queue - */ -public class ErrorEnrichmentBolt extends GenericEnrichmentBolt { - - public static final String TEST_ERROR_MESSAGE = "Test throwing error from ErrorEnrichmentBolt"; - - public ErrorEnrichmentBolt(String zookeeperUrl) { - super(zookeeperUrl); - } - - @SuppressWarnings("unchecked") - @Override - public void execute(Tuple tuple) { - JSONObject rawMessage = new JSONObject(); - rawMessage.put("rawMessage", "Error Test Raw Message String"); - - JSONObject enrichedMessage= new JSONObject(); - enrichedMessage.put("enrichedMessage", "Error Test Enriched Message String"); - handleError("key", rawMessage, "subgroup", enrichedMessage, new IllegalStateException(TEST_ERROR_MESSAGE)); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java index e798b72..a9a2fea 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java @@ -89,7 +89,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { public static final String DEFAULT_DMACODE= "test dmaCode"; public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE); - protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/test.yaml"; + protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/remote.yaml"; + protected String templatePath = "../metron-enrichment/src/main/config/enrichment.properties.j2"; protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed"; private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath); @@ -124,40 +125,39 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { final String threatIntelTableName = "threat_intel"; final String enrichmentsTableName = "enrichments"; final Properties topologyProperties = new Properties() {{ - setProperty("enrichment.workers", "1"); - setProperty("enrichment.acker.executors", "0"); - setProperty("topology.worker.childopts", ""); - setProperty("topology.auto-credentials", "[]"); - setProperty("topology.max.spout.pending", ""); - setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); - setProperty("kafka.security.protocol", "PLAINTEXT"); - setProperty("enrichment.input.topic", Constants.ENRICHMENT_TOPIC); - setProperty("enrichment.output.topic", Constants.INDEXING_TOPIC); - setProperty("enrichment.error.topic", ERROR_TOPIC); - setProperty("threat.intel.error.topic", ERROR_TOPIC); - setProperty("enrichment.join.cache.size", "1000"); - setProperty("threat.intel.join.cache.size", "1000"); - - setProperty("enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"},\n" + - "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"},\n" + - "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"},\n" + + setProperty("enrichment_workers", "1"); + setProperty("enrichment_acker_executors", "0"); + setProperty("enrichment_topology_worker_childopts", ""); + setProperty("topology_auto_credentials", "[]"); + setProperty("enrichment_topology_max_spout_pending", ""); + setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST"); + setProperty("kafka_security_protocol", "PLAINTEXT"); + setProperty("enrichment_input_topic", Constants.ENRICHMENT_TOPIC); + setProperty("enrichment_output_topic", Constants.INDEXING_TOPIC); + setProperty("enrichment_error_topic", ERROR_TOPIC); + setProperty("threatintel_error_topic", ERROR_TOPIC); + setProperty("enrichment_join_cache_size", "1000"); + setProperty("threatintel_join_cache_size", "1000"); + setProperty("enrichment_hbase_provider_impl", "org.apache.metron.enrichment.integration.EnrichmentIntegrationTest\\$Provider"); + setProperty("enrichment_table", enrichmentsTableName); + setProperty("enrichment_cf", cf); + setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," + + "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," + + "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," + "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]"); - setProperty("hbase.provider.impl", "" + Provider.class.getName()); - setProperty("threat.intel.tracker.table", trackerHBaseTableName); - setProperty("threat.intel.tracker.cf", cf); - setProperty("threat.intel.simple.hbase.table", threatIntelTableName); - setProperty("threat.intel.simple.hbase.cf", cf); - setProperty("enrichment.simple.hbase.table", enrichmentsTableName); - setProperty("enrichment.simple.hbase.cf", cf); - - setProperty("kafka.spout.parallelism", "1"); - setProperty("enrichment.split.parallelism", "1"); - setProperty("enrichment.stellar.parallelism", "1"); - setProperty("enrichment.join.parallelism", "1"); - setProperty("threat.intel.split.parallelism", "1"); - setProperty("threat.intel.stellar.parallelism", "1"); - setProperty("threat.intel.join.parallelism", "1"); - setProperty("kafka.writer.parallelism", "1"); + + setProperty("threatintel_table", threatIntelTableName); + setProperty("threatintel_cf", cf); + + + setProperty("enrichment_kafka_spout_parallelism", "1"); + setProperty("enrichment_split_parallelism", "1"); + setProperty("enrichment_stellar_parallelism", "1"); + setProperty("enrichment_join_parallelism", "1"); + setProperty("threat_intel_split_parallelism", "1"); + setProperty("threat_intel_stellar_parallelism", "1"); + setProperty("threat_intel_join_parallelism", "1"); + setProperty("kafka_writer_parallelism", "1"); }}; final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); @@ -200,6 +200,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() .withTopologyLocation(new File(fluxPath)) .withTopologyName("test") + .withTemplateLocation(new File(templatePath)) .withTopologyProperties(topologyProperties) .build(); @@ -254,10 +255,10 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { protected void validateErrors(List<Map<String, Object>> errors) { for(Map<String, Object> error : errors) { - Assert.assertEquals("Test throwing error from ErrorEnrichmentBolt", error.get(Constants.ErrorFields.MESSAGE.getName())); - Assert.assertEquals("java.lang.IllegalStateException: Test throwing error from ErrorEnrichmentBolt", error.get(Constants.ErrorFields.EXCEPTION.getName())); + Assert.assertEquals("java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.MESSAGE.getName())); + Assert.assertEquals("com.google.common.util.concurrent.UncheckedExecutionException: java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.EXCEPTION.getName())); Assert.assertEquals(Constants.ErrorType.ENRICHMENT_ERROR.getType(), error.get(Constants.ErrorFields.ERROR_TYPE.getName())); - Assert.assertEquals("{\"rawMessage\":\"Error Test Raw Message String\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName())); + Assert.assertEquals("{\"error_test\":{},\"source.type\":\"test\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName())); } } http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index 7c97479..c0f9919 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -119,23 +119,23 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { cleanHdfsDir(hdfsDir); final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath); final Properties topologyProperties = new Properties() {{ - setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); - setProperty("kafka.security.protocol", "PLAINTEXT"); - setProperty("storm.auto.credentials", "[]"); - setProperty("indexing.workers", "1"); - setProperty("indexing.acker.executors", "0"); - setProperty("topology.max.spout.pending", ""); - setProperty("indexing.input.topic", Constants.INDEXING_TOPIC); - setProperty("indexing.error.topic", ERROR_TOPIC); - setProperty("topology.auto-credentials", "[]"); + setProperty("indexing_kafka_start", "UNCOMMITTED_EARLIEST"); + setProperty("kafka_security_protocol", "PLAINTEXT"); + setProperty("topology_auto_credentials", "[]"); + setProperty("indexing_workers", "1"); + setProperty("indexing_acker_executors", "0"); + setProperty("indexing_topology_worker_childopts", ""); + setProperty("indexing_topology_max_spout_pending", ""); + setProperty("indexing_input_topic", Constants.INDEXING_TOPIC); + setProperty("indexing_error_topic", ERROR_TOPIC); //HDFS settings - setProperty("bolt.hdfs.rotation.policy", TimedRotationPolicy.class.getCanonicalName()); - setProperty("bolt.hdfs.rotation.policy.count", "1"); - setProperty("bolt.hdfs.rotation.policy.units", "DAYS"); - setProperty("indexing.hdfs.output", hdfsDir); - setProperty("kafka.spout.parallelism", "1"); - setProperty("indexing.writer.parallelism", "1"); - setProperty("hdfs.writer.parallelism", "1"); + setProperty("bolt_hdfs_rotation_policy", TimedRotationPolicy.class.getCanonicalName()); + setProperty("bolt_hdfs_rotation_policy_count", "1"); + setProperty("bolt_hdfs_rotation_policy_units", "DAYS"); + setProperty("metron_apps_indexed_hdfs_dir", hdfsDir); + setProperty("indexing_kafka_spout_parallelism", "1"); + setProperty("indexing_writer_parallelism", "1"); + setProperty("hdfs_writer_parallelism", "1"); }}; setAdditionalProperties(topologyProperties); final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); @@ -168,6 +168,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() .withTopologyLocation(new File(fluxPath)) .withTopologyName("test") + .withTemplateLocation(new File(getTemplatePath())) .withTopologyProperties(topologyProperties) .build(); @@ -305,4 +306,5 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { public abstract InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception; public abstract void setAdditionalProperties(Properties topologyProperties); public abstract String cleanField(String field); + public abstract String getTemplatePath(); } http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json index bba6679..4130943 100644 --- a/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json +++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json @@ -26,6 +26,8 @@ } ,"dst_enrichment" : { "dst_classification" : "ENRICHMENT_GET('playful_classification', ip_dst_addr, 'enrichments', 'cf')" + },"error_test" : { + "error_test": "1/0" } } } http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java index 7ae373b..1e7cd25 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java @@ -17,11 +17,9 @@ */ package org.apache.metron.integration; -import com.google.common.base.Function; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.ZKServerComponent; -import javax.annotation.Nullable; import java.util.List; import java.util.Properties; @@ -33,7 +31,10 @@ public abstract class BaseIntegrationTest { protected static ZKServerComponent getZKServerComponent(final Properties topologyProperties) { return new ZKServerComponent() - .withPostStartCallback((zkComponent) -> topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString()) + .withPostStartCallback((zkComponent) -> { + topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString()); + topologyProperties.setProperty("kafka.zk", zkComponent.getConnectionString()); + } ); } } http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java index 779db37..658e149 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java @@ -17,6 +17,7 @@ */ package org.apache.metron.integration.components; +import org.apache.commons.io.FileUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -45,7 +46,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Comparator; +import java.util.Map; import java.util.Properties; +import java.util.regex.Pattern; public class FluxTopologyComponent implements InMemoryComponent { @@ -54,12 +57,14 @@ public class FluxTopologyComponent implements InMemoryComponent { LocalCluster stormCluster; String topologyName; File topologyLocation; + File templateLocation; Properties topologyProperties; public static class Builder { String topologyName; File topologyLocation; + File templateLocation; Properties topologyProperties; public Builder withTopologyName(String name) { @@ -72,6 +77,11 @@ public class FluxTopologyComponent implements InMemoryComponent { return this; } + public Builder withTemplateLocation(File location) { + this.templateLocation = location; + return this; + } + public Builder withTopologyProperties(Properties properties) { this.topologyProperties = properties; this.topologyProperties.put("storm.home", "target"); @@ -79,13 +89,14 @@ public class FluxTopologyComponent implements InMemoryComponent { } public FluxTopologyComponent build() { - return new FluxTopologyComponent(topologyName, topologyLocation, topologyProperties); + return new FluxTopologyComponent(topologyName, topologyLocation, templateLocation, topologyProperties); } } - public FluxTopologyComponent(String topologyName, File topologyLocation, Properties topologyProperties) { + public FluxTopologyComponent(String topologyName, File topologyLocation, File templateLocation, Properties topologyProperties) { this.topologyName = topologyName; this.topologyLocation = topologyLocation; + this.templateLocation = templateLocation; this.topologyProperties = topologyProperties; } @@ -101,6 +112,10 @@ public class FluxTopologyComponent implements InMemoryComponent { return topologyLocation; } + public File getTemplateLocation() { + return templateLocation; + } + public Properties getTopologyProperties() { return topologyProperties; } @@ -217,11 +232,11 @@ public class FluxTopologyComponent implements InMemoryComponent { } public void submitTopology() throws NoSuchMethodException, IOException, InstantiationException, TException, IllegalAccessException, InvocationTargetException, ClassNotFoundException, NoSuchFieldException { - startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties()); + startTopology(getTopologyName(), getTopologyLocation(), getTemplateLocation(), getTopologyProperties()); } - private void startTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException, NoSuchFieldException{ - TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties); + private void startTopology(String topologyName, File topologyLoc, File templateFile, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException, NoSuchFieldException{ + TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, templateFile, properties); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); @@ -239,12 +254,26 @@ public class FluxTopologyComponent implements InMemoryComponent { } } - private static TopologyDef loadYaml(String topologyName, File yamlFile, Properties properties) throws IOException { + private static TopologyDef loadYaml(String topologyName, File yamlFile, File templateFile, Properties properties) throws IOException { File tmpFile = File.createTempFile(topologyName, "props"); tmpFile.deleteOnExit(); - try (FileWriter propWriter = new FileWriter(tmpFile)){ - properties.store(propWriter, topologyName + " properties"); - return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false); + if (templateFile != null) { + try (FileWriter propWriter = new FileWriter(tmpFile)){ + String templateContents = FileUtils.readFileToString(templateFile); + for(Map.Entry prop: properties.entrySet()) { + String replacePattern = String.format("{{%s}}", prop.getKey()); + templateContents = templateContents.replaceAll(Pattern.quote(replacePattern), (String) prop.getValue()); + } + propWriter.write(templateContents); + propWriter.flush(); + return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false); + } + } else { + try (FileWriter propWriter = new FileWriter(tmpFile)){ + properties.store(propWriter, topologyName + " properties"); + return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false); + } } + } } http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java index 6ec1314..1e5a041 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java @@ -142,7 +142,7 @@ public class KafkaComponent implements InMemoryComponent { @Override public void start() { // setup Zookeeper - zookeeperConnectString = topologyProperties.getProperty("kafka.zk"); + zookeeperConnectString = topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY); zkClient = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$); http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java index cc85d5f..0a47034 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java @@ -28,7 +28,7 @@ import java.util.Optional; import java.util.function.Consumer; public class ZKServerComponent implements InMemoryComponent { - public static final String ZOOKEEPER_PROPERTY = "kafka.zk"; + public static final String ZOOKEEPER_PROPERTY = "zookeeper_quorum"; private TestingServer testZkServer; private String zookeeperUrl = null; private Map<String,String> properties = null; http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java index b556411..171ff47 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java @@ -17,6 +17,7 @@ */ package org.apache.metron.parsers.integration.components; +import org.apache.metron.integration.components.ZKServerComponent; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.KillOptions; @@ -83,7 +84,7 @@ public class ParserTopologyComponent implements InMemoryComponent { @Override public void start() throws UnableToStartException { try { - TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty("kafka.zk") + TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY) , Optional.ofNullable(brokerUrl) , sensorType , 1 http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index e988c30..29c68d0 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -29,6 +29,8 @@ import java.io.FilenameFilter; import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,6 +46,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.kafka.clients.producer.Producer; import org.apache.metron.common.Constants; +import org.apache.metron.integration.BaseIntegrationTest; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.Processor; import org.apache.metron.integration.ProcessorResult; @@ -67,7 +70,7 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -public class PcapTopologyIntegrationTest { +public class PcapTopologyIntegrationTest extends BaseIntegrationTest { final static String KAFKA_TOPIC = "pcap"; private static String BASE_DIR = "pcap"; private static String DATA_DIR = BASE_DIR + "/data_dir"; @@ -185,12 +188,10 @@ public class PcapTopologyIntegrationTest { }}; updatePropertiesCallback.apply(topologyProperties); - final ZKServerComponent zkServerComponent = new ZKServerComponent().withPostStartCallback( - (zkComponent) -> topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString()) - ); - final KafkaComponent kafkaComponent = new KafkaComponent().withTopics(new ArrayList<KafkaComponent.Topic>() {{ - add(new KafkaComponent.Topic(KAFKA_TOPIC, 1)); - }}).withTopologyProperties(topologyProperties); + final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); + + final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, Collections.singletonList( + new KafkaComponent.Topic(KAFKA_TOPIC, 1))); final MRComponent mr = new MRComponent().withBasePath(baseDir.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-solr/src/main/assembly/assembly.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/assembly/assembly.xml b/metron-platform/metron-solr/src/main/assembly/assembly.xml index 93eeeb1..51491ae 100644 --- a/metron-platform/metron-solr/src/main/assembly/assembly.xml +++ b/metron-platform/metron-solr/src/main/assembly/assembly.xml @@ -25,6 +25,7 @@ <excludes> <exclude>**/*.formatted</exclude> <exclude>**/*.filtered</exclude> + <exclude>**/*.j2</exclude> </excludes> <fileMode>0644</fileMode> <lineEnding>unix</lineEnding> http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-solr/src/main/config/solr.properties.j2 ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/config/solr.properties.j2 b/metron-platform/metron-solr/src/main/config/solr.properties.j2 new file mode 100644 index 0000000..acb0f59 --- /dev/null +++ b/metron-platform/metron-solr/src/main/config/solr.properties.j2 @@ -0,0 +1,49 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +##### Storm ##### +indexing.workers={{indexing_workers}} +indexing.acker.executors={{indexing_acker_executors}} +topology.worker.childopts={{indexing_topology_worker_childopts}} +topology.auto-credentials={{topology_auto_credentials}} +topology.max.spout.pending={{indexing_topology_max_spout_pending}} + +##### Kafka ##### +kafka.zk={{zookeeper_quorum}} +kafka.broker={{kafka_brokers}} +kafka.security.protocol={{kafka_security_protocol}} + +# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST +kafka.start={{indexing_kafka_start}} + +indexing.input.topic={{indexing_input_topic}} +indexing.error.topic={{indexing_error_topic}} + +##### Indexing ##### +indexing.writer.class.name={{indexing_writer_class_name}} + +##### HDFS ##### +bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}} +bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}} +bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}} +indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}} + +##### Parallelism ##### +kafka.spout.parallelism={{indexing_kafka_spout_parallelism}} +indexing.writer.parallelism={{indexing_writer_parallelism}} +hdfs.writer.parallelism={{hdfs_writer_parallelism}} http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java index f47e8e8..c6406c8 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java @@ -115,11 +115,16 @@ public class SolrIndexingIntegrationTest extends IndexingIntegrationTest { @Override public void setAdditionalProperties(Properties topologyProperties) { - topologyProperties.setProperty("indexing.writer.class.name", "org.apache.metron.solr.writer.SolrWriter"); + topologyProperties.setProperty("indexing_writer_class_name", "org.apache.metron.solr.writer.SolrWriter"); } @Override public String cleanField(String field) { return field.replaceFirst("_[dfils]$", ""); } + + @Override + public String getTemplatePath() { + return "../metron-solr/src/main/config/solr.properties.j2"; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java index ef4d69d..8e351ff 100644 --- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java +++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java @@ -97,6 +97,7 @@ public class BaseEnrichmentBoltTest extends BaseBoltTest { joinStreamIds.add("stellar:numeric"); joinStreamIds.add("stellar:dst_enrichment"); joinStreamIds.add("stellar:src_enrichment"); + joinStreamIds.add("stellar:error_test"); joinStreamIds.add("host:"); joinStreamIds.add("hbaseEnrichment:"); joinStreamIds.add("message:");
