Repository: metron Updated Branches: refs/heads/master cc7bbc948 -> 87f652215
http://git-wip-us.apache.org/repos/asf/metron/blob/87f65221/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml index a8615fb..e67bc54 100644 --- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml +++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml @@ -18,9 +18,10 @@ name: "indexing" config: topology.workers: ${indexing.workers} - topology.acker.executors: ${indexing.executors} + topology.acker.executors: ${indexing.acker.executors} topology.worker.childopts: ${topology.worker.childopts} topology.auto-credentials: ${topology.auto-credentials} + topology.max.spout.pending: ${topology.max.spout.pending} components: @@ -35,7 +36,7 @@ components: - ".json" - name: "withPath" args: - - "${index.hdfs.output}" + - "${indexing.hdfs.output}" - id: "hdfsRotationPolicy" className: "${bolt.hdfs.rotation.policy}" @@ -66,7 +67,7 @@ components: configMethods: - name: "withTopic" args: - - "${index.error.topic}" + - "${indexing.error.topic}" - name: "withZkQuorum" args: - "${kafka.zk}" @@ -74,7 +75,7 @@ components: args: [ref: "kafkaWriterProps"] - id: "indexWriter" - className: "${writer.class.name}" + className: "${indexing.writer.class.name}" #kafka/zookeeper # Any kafka props for the producer go here. @@ -111,13 +112,12 @@ components: constructorArgs: - ref: "kafkaProps" # topic name - - "${index.input.topic}" + - "${indexing.input.topic}" - "${kafka.zk}" - ref: "fields" configMethods: - name: "setFirstPollOffsetStrategy" args: - # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST - "${kafka.start}" @@ -126,6 +126,8 @@ spouts: className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" constructorArgs: - ref: "kafkaConfig" + parallelism: ${kafka.spout.parallelism} + bolts: # Indexing Bolts @@ -140,6 +142,7 @@ bolts: - name: "withMessageGetter" args: - "DEFAULT_JSON_FROM_POSITION" + parallelism: ${indexing.writer.parallelism} - id: "hdfsIndexingBolt" className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" @@ -152,6 +155,7 @@ bolts: - name: "withMessageGetter" args: - "DEFAULT_JSON_FROM_POSITION" + parallelism: ${hdfs.writer.parallelism} - id: "indexingErrorBolt" className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" http://git-wip-us.apache.org/repos/asf/metron/blob/87f65221/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index 9e20b39..7c97479 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -117,24 +117,25 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { @Test public void test() throws Exception { cleanHdfsDir(hdfsDir); - final String dateFormat = "yyyy.MM.dd.HH"; final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath); final Properties topologyProperties = new Properties() {{ setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); setProperty("kafka.security.protocol", "PLAINTEXT"); setProperty("storm.auto.credentials", "[]"); setProperty("indexing.workers", "1"); - setProperty("indexing.executors", "0"); - setProperty("index.input.topic", Constants.INDEXING_TOPIC); - setProperty("index.error.topic", ERROR_TOPIC); - setProperty("index.date.format", dateFormat); + setProperty("indexing.acker.executors", "0"); + setProperty("topology.max.spout.pending", ""); + setProperty("indexing.input.topic", Constants.INDEXING_TOPIC); + setProperty("indexing.error.topic", ERROR_TOPIC); setProperty("topology.auto-credentials", "[]"); //HDFS settings - setProperty("bolt.hdfs.rotation.policy", TimedRotationPolicy.class.getCanonicalName()); setProperty("bolt.hdfs.rotation.policy.count", "1"); setProperty("bolt.hdfs.rotation.policy.units", "DAYS"); - setProperty("index.hdfs.output", hdfsDir); + setProperty("indexing.hdfs.output", hdfsDir); + setProperty("kafka.spout.parallelism", "1"); + setProperty("indexing.writer.parallelism", "1"); + setProperty("hdfs.writer.parallelism", "1"); }}; setAdditionalProperties(topologyProperties); final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); http://git-wip-us.apache.org/repos/asf/metron/blob/87f65221/metron-platform/metron-solr/src/main/config/solr.properties ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/config/solr.properties b/metron-platform/metron-solr/src/main/config/solr.properties index f0eca6c..7a7a091 100644 --- a/metron-platform/metron-solr/src/main/config/solr.properties +++ b/metron-platform/metron-solr/src/main/config/solr.properties @@ -16,49 +16,32 @@ ##### Storm ##### indexing.workers=1 -indexing.executors=0 +indexing.acker.executors=0 topology.worker.childopts= topology.auto-credentials=[''] +topology.max.spout.pending= ##### Kafka ##### - kafka.zk=node1:2181 kafka.broker=node1:6667 -# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST -kafka.start=UNCOMMITTED_EARLIEST kafka.security.protocol=PLAINTEXT -##### Indexing ##### -index.input.topic=indexing -index.error.topic=indexing -writer.class.name=org.apache.metron.solr.writer.SolrWriter - -##### Metrics ##### - -#reporters -org.apache.metron.metrics.reporter.graphite=true -org.apache.metron.metrics.reporter.console=false -org.apache.metron.metrics.reporter.jmx=false - -#Graphite Addresses - -org.apache.metron.metrics.graphite.address=localhost -org.apache.metron.metrics.graphite.port=2023 +# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST +kafka.start=UNCOMMITTED_EARLIEST -#TelemetryParserBolt -org.apache.metron.metrics.TelemetryParserBolt.acks=true -org.apache.metron.metrics.TelemetryParserBolt.emits=true -org.apache.metron.metrics.TelemetryParserBolt.fails=true +indexing.input.topic=indexing +indexing.error.topic=indexing +##### Indexing ##### +indexing.writer.class.name=org.apache.metron.solr.writer.SolrWriter ##### HDFS ##### - -bolt.hdfs.batch.size=5000 -bolt.hdfs.field.delimiter=| -bolt.hdfs.file.rotation.size.in.mb=5 -bolt.hdfs.file.system.url=hdfs://iot01.cloud.hortonworks.com:8020 -bolt.hdfs.wip.file.path=/paloalto/wip -bolt.hdfs.finished.file.path=/paloalto/rotated -bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec -index.hdfs.output=/tmp/metron/enriched - +bolt.hdfs.rotation.policy=org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy +bolt.hdfs.rotation.policy.units=DAYS +bolt.hdfs.rotation.policy.count=1 +indexing.hdfs.output=/tmp/metron/enriched + +##### Parallelism ##### +kafka.spout.parallelism=1 +indexing.writer.parallelism=1 +hdfs.writer.parallelism=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/87f65221/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java index c209ef3..f47e8e8 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java @@ -115,7 +115,7 @@ public class SolrIndexingIntegrationTest extends IndexingIntegrationTest { @Override public void setAdditionalProperties(Properties topologyProperties) { - topologyProperties.setProperty("writer.class.name", "org.apache.metron.solr.writer.SolrWriter"); + topologyProperties.setProperty("indexing.writer.class.name", "org.apache.metron.solr.writer.SolrWriter"); } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/87f65221/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1b5f89f..a1d23e2 100644 --- a/pom.xml +++ b/pom.xml @@ -287,6 +287,7 @@ <exclude>**/*.md</exclude> <exclude>**/VERSION</exclude> <exclude>**/*.json</exclude> + <exclude>**/*.json.j2</exclude> <exclude>**/*.tokens</exclude> <exclude>**/*.log</exclude> <exclude>**/*.template</exclude>
