METRON-793: Migrate to storm-kafka-client kafka spout from storm-kafka closes apache/incubator-metron#486
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/98dc7659 Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/98dc7659 Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/98dc7659 Branch: refs/heads/master Commit: 98dc7659a8ce86f5ced1324b0832f8950c6a65a9 Parents: c13ee82 Author: cstella <ceste...@gmail.com> Authored: Wed Mar 29 08:04:37 2017 -0400 Committer: cstella <ceste...@gmail.com> Committed: Wed Mar 29 08:04:37 2017 -0400 ---------------------------------------------------------------------- metron-analytics/metron-profiler/pom.xml | 5 + .../src/main/config/profiler.properties | 3 +- .../src/main/flux/profiler/remote.yaml | 47 ++-- .../integration/ProfilerIntegrationTest.java | 25 +- .../METRON/CURRENT/configuration/metron-env.xml | 2 +- .../rest/service/impl/KafkaServiceImpl.java | 3 +- metron-platform/metron-common/pom.xml | 10 - .../resolver/ClasspathFunctionResolver.java | 12 +- .../metron/common/spout/kafka/SpoutConfig.java | 59 ----- .../common/spout/kafka/SpoutConfigFunction.java | 24 -- .../common/spout/kafka/SpoutConfigOptions.java | 80 ------- .../apache/metron/common/utils/KafkaUtils.java | 28 ++- .../utils/timestamp/TimestampConverters.java | 10 +- .../common/spout/kafka/SpoutConfigTest.java | 102 -------- .../common/utils/KafkaUtilsEndpointTest.java | 64 +++++ .../src/main/config/elasticsearch.properties | 3 +- metron-platform/metron-enrichment/pom.xml | 5 + .../src/main/flux/enrichment/remote.yaml | 58 +++-- .../src/main/flux/enrichment/test.yaml | 52 +++-- .../src/main/flux/indexing/remote.yaml | 46 ++-- .../integration/IndexingIntegrationTest.java | 3 +- .../components/FluxTopologyComponent.java | 58 ++++- .../integration/components/KafkaComponent.java | 22 +- .../metron/management/KafkaFunctions.java | 14 +- metron-platform/metron-parsers/README.md | 29 ++- .../parsers/topology/ParserTopologyBuilder.java | 36 +-- .../parsers/topology/ParserTopologyCLI.java | 36 ++- .../components/ParserTopologyComponent.java | 45 +++- .../parsers/topology/ParserTopologyCLITest.java | 2 - metron-platform/metron-pcap-backend/pom.xml | 5 - .../src/main/flux/pcap/remote.yaml | 44 ++-- .../metron/spout/pcap/HDFSWriterCallback.java | 111 ++++----- .../metron/spout/pcap/HDFSWriterConfig.java | 41 ++++ .../metron/spout/pcap/KafkaToHDFSSpout.java | 12 +- .../metron/spout/pcap/PartitionHDFSWriter.java | 9 +- .../apache/metron/spout/pcap/SpoutConfig.java | 35 --- .../spout/pcap/deserializer/Deserializers.java | 59 +++++ .../pcap/deserializer/FromKeyDeserializer.java | 70 ++++++ .../deserializer/FromPacketDeserializer.java | 43 ++++ .../pcap/deserializer/KeyValueDeserializer.java | 41 ++++ .../metron/spout/pcap/scheme/FromKeyScheme.java | 72 ------ .../spout/pcap/scheme/FromPacketScheme.java | 60 ----- .../spout/pcap/scheme/KeyConvertible.java | 25 -- .../spout/pcap/scheme/TimestampScheme.java | 46 ---- .../pcap/scheme/TimestampSchemeCreator.java | 26 --- .../PcapTopologyIntegrationTest.java | 6 +- metron-platform/metron-pcap/pom.xml | 12 +- .../java/org/apache/storm/kafka/Callback.java | 26 --- .../apache/storm/kafka/CallbackCollector.java | 186 --------------- .../apache/storm/kafka/CallbackKafkaSpout.java | 92 -------- .../org/apache/storm/kafka/EmitContext.java | 147 ------------ .../metron-solr/src/main/config/solr.properties | 3 +- metron-platform/metron-storm-kafka/pom.xml | 128 ++++++++++ .../kafka/flux/SimpleStormKafkaBuilder.java | 232 +++++++++++++++++++ .../storm/kafka/flux/SpoutConfiguration.java | 139 +++++++++++ .../storm/kafka/flux/StormKafkaSpout.java | 63 +++++ .../java/org/apache/storm/kafka/Callback.java | 26 +++ .../apache/storm/kafka/CallbackCollector.java | 187 +++++++++++++++ .../apache/storm/kafka/CallbackKafkaSpout.java | 107 +++++++++ .../org/apache/storm/kafka/EmitContext.java | 95 ++++++++ .../kafka/flux/SpoutConfigurationTest.java | 63 +++++ metron-platform/pom.xml | 1 + pom.xml | 33 ++- 63 files changed, 1881 insertions(+), 1247 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-analytics/metron-profiler/pom.xml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/pom.xml b/metron-analytics/metron-profiler/pom.xml index b0185b0..474b34c 100644 --- a/metron-analytics/metron-profiler/pom.xml +++ b/metron-analytics/metron-profiler/pom.xml @@ -78,6 +78,11 @@ </dependency> <dependency> <groupId>org.apache.metron</groupId> + <artifactId>metron-storm-kafka</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> <artifactId>metron-statistics</artifactId> <version>${project.parent.version}</version> <exclusions> http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-analytics/metron-profiler/src/main/config/profiler.properties ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties index 91e4226..860934f 100644 --- a/metron-analytics/metron-profiler/src/main/config/profiler.properties +++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties @@ -39,4 +39,5 @@ profiler.hbase.flush.interval.seconds=30 kafka.zk=node1:2181 kafka.broker=node1:6667 -kafka.start=WHERE_I_LEFT_OFF +# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST +kafka.start=UNCOMMITTED_EARLIEST http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml index f97b97a..7ea77a5 100644 --- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml +++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml @@ -44,26 +44,45 @@ components: ref: "rowKeyBuilder" - name: "columnBuilder" ref: "columnBuilder" - - - id: "zkHosts" - className: "org.apache.storm.kafka.ZkHosts" - constructorArgs: - - "${kafka.zk}" + # Any kafka props for the producer go here. + - id: "kafkaProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "value.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "key.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "group.id" + - "profiler" + + # The fields to pull out of the kafka messages + - id: "fields" + className: "java.util.ArrayList" + configMethods: + - name: "add" + args: + - "value" - id: "kafkaConfig" - className: "org.apache.metron.common.spout.kafka.SpoutConfig" + className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" constructorArgs: # zookeeper hosts - - ref: "zkHosts" + - ref: "kafkaProps" # topic name - "${profiler.input.topic}" - # zk root - - "" - # id - - "indexing" + - "${kafka.zk}" + - ref: "fields" configMethods: - - name: "from" - args: ["${kafka.start}"] + - name: "setFirstPollOffsetStrategy" + args: + - "${kafka.start}" + - id: "kafkaWriter" className: "org.apache.metron.writer.kafka.KafkaWriter" @@ -82,7 +101,7 @@ components: spouts: - id: "kafkaSpout" - className: "org.apache.storm.kafka.KafkaSpout" + className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" constructorArgs: - ref: "kafkaConfig" http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java index 357908d..bca8ed5 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java @@ -20,6 +20,7 @@ package org.apache.metron.profiler.integration; +import com.google.common.base.Joiner; import org.adrianwalker.multilinestring.Multiline; import org.apache.commons.math.util.MathUtils; import org.apache.hadoop.conf.Configuration; @@ -28,7 +29,6 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.metron.common.Constants; -import org.apache.metron.common.spout.kafka.SpoutConfig; import org.apache.metron.common.utils.SerDeUtils; import org.apache.metron.hbase.TableProvider; import org.apache.metron.integration.BaseIntegrationTest; @@ -39,7 +39,6 @@ import org.apache.metron.integration.components.ZKServerComponent; import org.apache.metron.profiler.hbase.ColumnBuilder; import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder; import org.apache.metron.statistics.OnlineStatisticsProvider; -import org.apache.metron.statistics.StatisticsProvider; import org.apache.metron.test.mock.MockHTable; import org.junit.After; import org.junit.Assert; @@ -176,13 +175,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class); // verify - 10.0.0.3 -> 1/6 - Assert.assertTrue(actuals.stream().anyMatch(val -> - MathUtils.equals(val, 1.0/6.0, epsilon) + Assert.assertTrue( "Could not find a value near 1/6. Actual values read are are: " + Joiner.on(",").join(actuals) + , actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/6.0, epsilon) )); // verify - 10.0.0.2 -> 6/1 - Assert.assertTrue(actuals.stream().anyMatch(val -> - MathUtils.equals(val, 6.0/1.0, epsilon) + Assert.assertTrue("Could not find a value near 6. Actual values read are are: " + Joiner.on(",").join(actuals) + ,actuals.stream().anyMatch(val -> MathUtils.equals(val, 6.0/1.0, epsilon) )); } @@ -206,8 +205,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class); // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20 - Assert.assertTrue(actuals.stream().anyMatch(val -> - MathUtils.equals(val, 20.0, epsilon) + Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals) + , actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon) )); } @@ -232,8 +231,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { List<OnlineStatisticsProvider> actuals = read(profilerTable.getPutLog(), columnFamily, column, OnlineStatisticsProvider.class); // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20 - Assert.assertTrue(actuals.stream().anyMatch(val -> - MathUtils.equals(val.getMean(), 20.0, epsilon) + Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals) + , actuals.stream().anyMatch(val -> MathUtils.equals(val.getMean(), 20.0, epsilon) )); } @@ -253,8 +252,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class); // verify - the 70th percentile of 5 x 20s = 20.0 - Assert.assertTrue(actuals.stream().anyMatch(val -> - MathUtils.equals(val, 20.0, epsilon))); + Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals) + , actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon))); } /** @@ -290,7 +289,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { // storm topology properties final Properties topologyProperties = new Properties() {{ - setProperty("kafka.start", SpoutConfig.Offset.BEGINNING.name()); + setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); setProperty("profiler.workers", "1"); setProperty("profiler.executors", "0"); setProperty("profiler.input.topic", inputTopic); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml index 277b636..199e708 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml @@ -177,7 +177,7 @@ indexing.executors=0 ##### Kafka ##### kafka.zk={{ zookeeper_quorum }} kafka.broker={{ kafka_brokers }} -kafka.start=WHERE_I_LEFT_OFF +kafka.start=UNCOMMITTED_EARLIEST ##### Indexing ##### index.input.topic=indexing index.error.topic=indexing http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java index 7b10dc4..33cb2e3 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java @@ -20,6 +20,7 @@ package org.apache.metron.rest.service.impl; import kafka.admin.AdminOperationException; import kafka.admin.AdminUtils$; import kafka.admin.RackAwareMode; +import kafka.admin.RackAwareMode$; import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -54,7 +55,7 @@ public class KafkaServiceImpl implements KafkaService { public KafkaTopic createTopic(KafkaTopic topic) throws RestException { if (!listTopics().contains(topic.getName())) { try { - adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$); + adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(),RackAwareMode.Disabled$.MODULE$ ); } catch (AdminOperationException e) { throw new RestException(e); } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml index bb55a41..7416184 100644 --- a/metron-platform/metron-common/pom.xml +++ b/metron-platform/metron-common/pom.xml @@ -299,16 +299,6 @@ <version>${global_flux_version}</version> </dependency> <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-kafka</artifactId> - <exclusions> - <exclusion> - <artifactId>org.apache.curator</artifactId> - <groupId>curator-client</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>${global_curator_version}</version> http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/resolver/ClasspathFunctionResolver.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/resolver/ClasspathFunctionResolver.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/resolver/ClasspathFunctionResolver.java index 4589b61..3c9524b 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/resolver/ClasspathFunctionResolver.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/resolver/ClasspathFunctionResolver.java @@ -235,8 +235,16 @@ public class ClasspathFunctionResolver extends BaseFunctionResolver { } FilterBuilder filterBuilder = new FilterBuilder(); - excludes.forEach(excl -> filterBuilder.exclude(excl)); - includes.forEach(incl -> filterBuilder.include(incl)); + excludes.forEach(excl -> { + if(excl != null) { + filterBuilder.exclude(excl); + } + }); + includes.forEach(incl -> { + if(incl != null) { + filterBuilder.include(incl); + } + }); Set<String> classes = new HashSet<>(); Set<Class<? extends StellarFunction>> ret = new HashSet<>(); for(ClassLoader cl : cls) { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java deleted file mode 100644 index c26d1f5..0000000 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.common.spout.kafka; - -import org.apache.storm.kafka.BrokerHosts; - -public class SpoutConfig extends org.apache.storm.kafka.SpoutConfig { - public static enum Offset { - BEGINNING, END, WHERE_I_LEFT_OFF; - } - public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { - super(hosts, topic, zkRoot, id); - } - - public SpoutConfig from(String offset) { - try { - Offset o = Offset.valueOf(offset); - from(o); - } - catch(IllegalArgumentException iae) { - from(Offset.WHERE_I_LEFT_OFF); - } - ignoreZkOffsets = true; - startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); - return this; - } - - public SpoutConfig from(Offset offset) { - if(offset == Offset.BEGINNING) { - ignoreZkOffsets = true; - startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); - } - else if(offset == Offset.END) { - ignoreZkOffsets = true; - startOffsetTime = kafka.api.OffsetRequest.LatestTime(); - } - else if(offset == Offset.WHERE_I_LEFT_OFF) { - ignoreZkOffsets = false; - startOffsetTime = kafka.api.OffsetRequest.LatestTime(); - } - return this; - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java deleted file mode 100644 index 5aec998..0000000 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.common.spout.kafka; - -public interface SpoutConfigFunction { - void configure(org.apache.storm.kafka.SpoutConfig config, Object val); - -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java deleted file mode 100644 index 7d1c7c3..0000000 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.common.spout.kafka; - -import com.google.common.base.Joiner; -import org.apache.metron.common.utils.ConversionUtils; -import org.apache.storm.kafka.SpoutConfig; - -import java.util.EnumMap; -import java.util.Map; - -public enum SpoutConfigOptions implements SpoutConfigFunction { - retryDelayMaxMs( (config, val) -> config.retryDelayMaxMs = convertVal(val, Long.class) ), - retryDelayMultiplier ( (config, val) -> config.retryDelayMultiplier = convertVal(val, Double.class)), - retryInitialDelayMs( (config, val) -> config.retryInitialDelayMs = convertVal(val, Long.class)), - stateUpdateIntervalMs( (config, val) -> config.stateUpdateIntervalMs = convertVal(val, Long.class)), - bufferSizeBytes( (config, val) -> config.bufferSizeBytes = convertVal(val, Integer.class)), - fetchMaxWait( (config, val) -> config.fetchMaxWait = convertVal(val, Integer.class)), - fetchSizeBytes( (config, val) -> config.fetchSizeBytes= convertVal(val, Integer.class)), - maxOffsetBehind( (config, val) -> config.maxOffsetBehind = convertVal(val, Long.class)), - metricsTimeBucketSizeInSecs( (config, val) -> config.metricsTimeBucketSizeInSecs = convertVal(val, Integer.class)), - socketTimeoutMs( (config, val) -> config.socketTimeoutMs = convertVal(val, Integer.class)), - ; - - SpoutConfigFunction spoutConfigFunc; - SpoutConfigOptions(SpoutConfigFunction spoutConfigFunc) { - this.spoutConfigFunc = spoutConfigFunc; - } - - @Override - public void configure(SpoutConfig config, Object val) { - spoutConfigFunc.configure(config, val); - } - - public static SpoutConfig configure(SpoutConfig config, EnumMap<SpoutConfigOptions, Object> configs) { - if(configs != null) { - for(Map.Entry<SpoutConfigOptions, Object> kv : configs.entrySet()) { - kv.getKey().configure(config, kv.getValue()); - } - } - return config; - } - - public static EnumMap<SpoutConfigOptions, Object> coerceMap(Map<String, Object> map) { - EnumMap<SpoutConfigOptions, Object> ret = new EnumMap<>(SpoutConfigOptions.class); - for(Map.Entry<String, Object> kv : map.entrySet()) { - try { - ret.put(SpoutConfigOptions.valueOf(kv.getKey()), kv.getValue()); - } - catch(Exception ex) { - String possibleOptions = Joiner.on(",").join(SpoutConfigOptions.values()); - throw new IllegalArgumentException("Configuration keys for spout config must be one of: " + possibleOptions, ex); - } - } - return ret; - } - private static <EXPECTED_T> EXPECTED_T convertVal(Object val, Class<EXPECTED_T> clazz) { - Object ret = ConversionUtils.convert(val, clazz); - if(ret == null) { - throw new IllegalArgumentException("Unable to convert " + val + " to expected type " + clazz.getCanonicalName()); - } - return clazz.cast(ret); - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java index 6d2af72..04c1389 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java @@ -19,11 +19,15 @@ package org.apache.metron.common.utils; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -47,7 +51,29 @@ public enum KafkaUtils { String brokerInfoStr = new String(data); Map<String, Object> brokerInfo = JSONUtils.INSTANCE.load(brokerInfoStr, new TypeReference<Map<String, Object>>() { }); - ret.add(brokerInfo.get("host") + ":" + brokerInfo.get("port")); + String host = (String) brokerInfo.get("host"); + if(host != null) { + ret.add(host + ":" + brokerInfo.get("port")); + } + else { + Object endpoints = brokerInfo.get("endpoints"); + if(endpoints != null && endpoints instanceof List) { + List<String> eps = (List<String>)endpoints; + for(String url : eps) { + ret.addAll(fromEndpoint(url)); + } + } + } + } + return ret; + } + + public List<String> fromEndpoint(String url) throws URISyntaxException { + List<String> ret = new ArrayList<>(); + if(url != null) { + URI uri = new URI(url); + int port = uri.getPort(); + ret.add(uri.getHost() + ((port > 0)?(":" + port):"")); } return ret; } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/timestamp/TimestampConverters.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/timestamp/TimestampConverters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/timestamp/TimestampConverters.java index 1c3aa7f..f0e0dd0 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/timestamp/TimestampConverters.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/timestamp/TimestampConverters.java @@ -19,6 +19,8 @@ package org.apache.metron.common.utils.timestamp; +import com.google.common.base.Joiner; + public enum TimestampConverters implements TimestampConverter{ MILLISECONDS(tsMilli -> tsMilli*1000000L) ,MICROSECONDS(tsMicro -> tsMicro*1000L) @@ -29,7 +31,13 @@ public enum TimestampConverters implements TimestampConverter{ } public static TimestampConverter getConverter(String converter) { - return TimestampConverters.valueOf(converter).converter; + if(converter != null) { + return TimestampConverters.valueOf(converter.toUpperCase()).converter; + } + else { + throw new IllegalStateException(converter + " is not a valid timestamp converter: " + + Joiner.on(",").join(TimestampConverters.values())); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java deleted file mode 100644 index 55c9032..0000000 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.common.spout.kafka; - -import com.fasterxml.jackson.core.type.TypeReference; -import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.utils.JSONUtils; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -public class SpoutConfigTest { - - /** - { - "retryDelayMaxMs" : 1000, - "retryDelayMultiplier" : 1.2, - "retryInitialDelayMs" : 2000, - "stateUpdateIntervalMs" : 3000, - "bufferSizeBytes" : 4000, - "fetchMaxWait" : 5000, - "fetchSizeBytes" : 6000, - "maxOffsetBehind" : 7000, - "metricsTimeBucketSizeInSecs" : 8000, - "socketTimeoutMs" : 9000 - } - */ - @Multiline - public static String config; - - @Test - public void testConfigApplication() throws IOException { - SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null); - Map<String, Object> configMap = JSONUtils.INSTANCE.load(config, new TypeReference<Map<String, Object>>() { - }); - SpoutConfigOptions.configure(spoutConfig, SpoutConfigOptions.coerceMap(configMap)); - Assert.assertEquals(1000, spoutConfig.retryDelayMaxMs); - Assert.assertEquals(1.2, spoutConfig.retryDelayMultiplier, 1e-7); - Assert.assertEquals(2000, spoutConfig.retryInitialDelayMs); - Assert.assertEquals(3000, spoutConfig.stateUpdateIntervalMs); - Assert.assertEquals(4000, spoutConfig.bufferSizeBytes); - Assert.assertEquals(5000, spoutConfig.fetchMaxWait); - Assert.assertEquals(6000, spoutConfig.fetchSizeBytes); - Assert.assertEquals(7000, spoutConfig.maxOffsetBehind); - Assert.assertEquals(8000, spoutConfig.metricsTimeBucketSizeInSecs); - Assert.assertEquals(9000, spoutConfig.socketTimeoutMs); - } - /** - { - "retryDelayMaxMs" : 1000, - "retryDelayMultiplier" : 1.2, - "retryInitialDelayMs" : 2000, - "stateUpdateIntervalMs" : 3000, - "bufferSizeBytes" : 4000 - } - */ - @Multiline - public static String incompleteConfig; - @Test - public void testIncompleteConfigApplication() throws IOException { - SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null); - Map<String, Object> configMap = JSONUtils.INSTANCE.load(incompleteConfig, new TypeReference<Map<String, Object>>() { - }); - SpoutConfigOptions.configure(spoutConfig, SpoutConfigOptions.coerceMap(configMap)); - Assert.assertEquals(1000, spoutConfig.retryDelayMaxMs); - Assert.assertEquals(1.2, spoutConfig.retryDelayMultiplier, 1e-7); - Assert.assertEquals(2000, spoutConfig.retryInitialDelayMs); - Assert.assertEquals(3000, spoutConfig.stateUpdateIntervalMs); - Assert.assertEquals(4000, spoutConfig.bufferSizeBytes); - } - - @Test - public void testEmptyConfigApplication() throws IOException { - SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null); - SpoutConfigOptions.configure(spoutConfig, SpoutConfigOptions.coerceMap(new HashMap<>())); - //ensure defaults are used - Assert.assertEquals(60*1000, spoutConfig.retryDelayMaxMs); - Assert.assertEquals(1.0, spoutConfig.retryDelayMultiplier, 1e-7); - Assert.assertEquals(0, spoutConfig.retryInitialDelayMs); - Assert.assertEquals(2000, spoutConfig.stateUpdateIntervalMs); - Assert.assertEquals(1024*1024, spoutConfig.bufferSizeBytes); - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java new file mode 100644 index 0000000..14a9e41 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.utils; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +@RunWith(Parameterized.class) +public class KafkaUtilsEndpointTest { + static String[] hostnames = new String[] { "node1", "localhost", "192.168.0.1", "my.domain.com" }; + static String[] schemes = new String[] { "SSL", "PLAINTEXTSASL", "PLAINTEXT"}; + static String[] ports = new String[] { "6667", "9091", null}; + private String endpoint; + private String expected; + + public KafkaUtilsEndpointTest(String endpoint, String expected) { + this.endpoint = endpoint; + this.expected = expected; + } + + @Parameterized.Parameters + public static Collection<Object[]> data() { + List<Object[]> ret = new ArrayList<>(); + for(String scheme : schemes) { + for(String hostname : hostnames) { + for(String port : ports) { + port = port != null?(":" + port):""; + String expected = hostname + port; + ret.add(new Object[]{scheme + "://" + expected, expected }); + } + } + } + return ret; + } + + @Test + public void testEndpointParsing() throws URISyntaxException { + Assert.assertEquals(expected, KafkaUtils.INSTANCE.fromEndpoint(endpoint).get(0)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties index 27e9173..317742b 100644 --- a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties +++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties @@ -22,7 +22,8 @@ indexing.executors=0 kafka.zk=node1:2181 kafka.broker=node1:6667 -kafka.start=WHERE_I_LEFT_OFF +# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST +kafka.start=UNCOMMITTED_EARLIEST ##### Indexing ##### index.input.topic=indexing http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-enrichment/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml index 1ff46a9..045676f 100644 --- a/metron-platform/metron-enrichment/pom.xml +++ b/metron-platform/metron-enrichment/pom.xml @@ -38,6 +38,11 @@ </dependency> <dependency> <groupId>org.apache.metron</groupId> + <artifactId>metron-storm-kafka</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> <artifactId>metron-writer</artifactId> <version>${project.parent.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml index 054710f..439105f 100644 --- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml +++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml @@ -176,32 +176,52 @@ components: - "${kafka.zk}" #kafka/zookeeper - - id: "zkHosts" - className: "org.apache.storm.kafka.ZkHosts" - constructorArgs: - - "${kafka.zk}" + # Any kafka props for the producer go here. + - id: "kafkaProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "value.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "key.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "group.id" + - "enrichments" + + # The fields to pull out of the kafka messages + - id: "fields" + className: "java.util.ArrayList" + configMethods: + - name: "add" + args: + - "value" + - id: "kafkaConfig" - className: "org.apache.storm.kafka.SpoutConfig" + className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" constructorArgs: - # zookeeper hosts - - ref: "zkHosts" - # topic name - - "enrichments" - # zk root - - "" - # id - - "enrichments" - properties: - - name: "ignoreZkOffsets" - value: true - - name: "startOffsetTime" - value: -1 + - ref: "kafkaProps" + # topic name + - "enrichments" + - "${kafka.zk}" + - ref: "fields" + configMethods: + - name: "setFirstPollOffsetStrategy" + args: + # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST + - "UNCOMMITTED_EARLIEST" + spouts: - id: "kafkaSpout" - className: "org.apache.storm.kafka.KafkaSpout" + className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" constructorArgs: - ref: "kafkaConfig" + bolts: # Enrichment Bolts - id: "enrichmentSplitBolt" http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml index 077fedb..b7fb8d4 100644 --- a/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml +++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml @@ -175,32 +175,52 @@ components: - "${kafka.zk}" #kafka/zookeeper - - id: "zkHosts" - className: "org.apache.storm.kafka.ZkHosts" - constructorArgs: - - "${kafka.zk}" +# Any kafka props for the producer go here. + - id: "kafkaProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "value.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "key.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "group.id" + - "enrichments" + + # The fields to pull out of the kafka messages + - id: "fields" + className: "java.util.ArrayList" + configMethods: + - name: "add" + args: + - "value" + - id: "kafkaConfig" - className: "org.apache.storm.kafka.SpoutConfig" + className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" constructorArgs: # zookeeper hosts - - ref: "zkHosts" + - ref: "kafkaProps" # topic name - "enrichments" - # zk root - - "" - # id - - "enrichments" - properties: - - name: "ignoreZkOffsets" - value: true - - name: "startOffsetTime" - value: -2 + - "${kafka.zk}" + - ref: "fields" + configMethods: + - name: "setFirstPollOffsetStrategy" + args: + - "UNCOMMITTED_EARLIEST" + spouts: - id: "kafkaSpout" - className: "org.apache.storm.kafka.KafkaSpout" + className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" constructorArgs: - ref: "kafkaConfig" + bolts: # Enrichment Bolts - id: "enrichmentSplitBolt" http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml index 3e329f4..3905a7a 100644 --- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml +++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml @@ -63,29 +63,49 @@ components: className: "${writer.class.name}" #kafka/zookeeper - - id: "zkHosts" - className: "org.apache.storm.kafka.ZkHosts" - constructorArgs: - - "${kafka.zk}" +# Any kafka props for the producer go here. + - id: "kafkaProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "value.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "key.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "group.id" + - "indexing" + +# The fields to pull out of the kafka messages + - id: "fields" + className: "java.util.ArrayList" + configMethods: + - name: "add" + args: + - "value" + - id: "kafkaConfig" - className: "org.apache.metron.common.spout.kafka.SpoutConfig" + className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" constructorArgs: - # zookeeper hosts - - ref: "zkHosts" + - ref: "kafkaProps" # topic name - "${index.input.topic}" - # zk root - - "" - # id - - "indexing" + - "${kafka.zk}" + - ref: "fields" configMethods: - - name: "from" + - name: "setFirstPollOffsetStrategy" args: + # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST - "${kafka.start}" + spouts: - id: "kafkaSpout" - className: "org.apache.storm.kafka.KafkaSpout" + className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" constructorArgs: - ref: "kafkaConfig" bolts: http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index 60cd1d1..394fbf0 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -26,7 +26,6 @@ import org.apache.metron.TestConstants; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.interfaces.FieldNameConverter; -import org.apache.metron.common.spout.kafka.SpoutConfig; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.integration.components.ConfigUploadComponent; import org.apache.metron.integration.BaseIntegrationTest; @@ -121,7 +120,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { final String dateFormat = "yyyy.MM.dd.HH"; final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath); final Properties topologyProperties = new Properties() {{ - setProperty("kafka.start", SpoutConfig.Offset.BEGINNING.name()); + setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); setProperty("indexing.workers", "1"); setProperty("indexing.executors", "0"); setProperty("index.input.topic", Constants.INDEXING_TOPIC); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java index 1c1f907..d34ff08 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java @@ -42,6 +42,10 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -72,6 +76,7 @@ public class FluxTopologyComponent implements InMemoryComponent { public Builder withTopologyProperties(Properties properties) { this.topologyProperties = properties; + this.topologyProperties.put("storm.home", "target"); return this; } @@ -130,13 +135,64 @@ public class FluxTopologyComponent implements InMemoryComponent { } } + public static void cleanupWorkerDir() { + if(new File("logs/workers-artifacts").exists()) { + Path rootPath = Paths.get("logs"); + Path destPath = Paths.get("target/logs"); + try { + Files.move(rootPath, destPath); + Files.walk(destPath) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } catch (IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + } + @Override public void stop() { if (stormCluster != null) { - stormCluster.shutdown(); + try { + try { + stormCluster.shutdown(); + } catch (IllegalStateException ise) { + if (!(ise.getMessage().contains("It took over") && ise.getMessage().contains("to shut down slot"))) { + throw ise; + } + else { + assassinateSlots(); + LOG.error("Storm slots didn't shut down entirely cleanly *sigh*. " + + "I gave them the old one-two-skadoo and killed the slots with prejudice. " + + "If tests fail, we'll have to find a better way of killing them.", ise); + } + } + } + catch(Throwable t) { + LOG.error(t.getMessage(), t); + } + finally { + cleanupWorkerDir(); + } } } + public static void assassinateSlots() { + /* + You might be wondering why I'm not just casting to slot here, but that's because the Slot class moved locations + and we're supporting multiple versions of storm. + */ + Thread.getAllStackTraces().keySet().stream().filter(t -> t instanceof AutoCloseable && t.getName().toLowerCase().contains("slot")).forEach(t -> { + AutoCloseable slot = (AutoCloseable) t; + try { + slot.close(); + } catch (Exception e) { + LOG.error("Tried to kill " + t.getName() + " but.." + e.getMessage(), e); + } + }); + } + public void submitTopology() throws NoSuchMethodException, IOException, InstantiationException, TException, IllegalAccessException, InvocationTargetException, ClassNotFoundException, NoSuchFieldException { startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties()); } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java index f6117dc..e55b317 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java @@ -39,6 +39,8 @@ import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.wrapper.AdminUtilsWrapper; import org.apache.metron.integration.wrapper.TestUtilsWrapper; import org.apache.metron.test.utils.UnitTestHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.*; @@ -47,6 +49,8 @@ import java.util.logging.Level; public class KafkaComponent implements InMemoryComponent { + protected static final Logger LOG = LoggerFactory.getLogger(KafkaComponent.class); + public static class Topic { public int numPartitions; public String name; @@ -56,6 +60,8 @@ public class KafkaComponent implements InMemoryComponent { this.name = name; } } + + private List<KafkaProducer> producersCreated = new ArrayList<>(); private transient KafkaServer kafkaServer; private transient ZkClient zkClient; private transient ConsumerConnector consumer; @@ -128,7 +134,9 @@ public class KafkaComponent implements InMemoryComponent { producerConfig.put("message.max.bytes", "" + 1024*1024*10); producerConfig.put("message.send.max.retries", "10"); producerConfig.putAll(properties); - return new KafkaProducer<>(producerConfig); + KafkaProducer<K, V> ret = new KafkaProducer<>(producerConfig); + producersCreated.add(ret); + return ret; } @Override @@ -170,6 +178,7 @@ public class KafkaComponent implements InMemoryComponent { @Override public void stop() { shutdownConsumer(); + shutdownProducers(); if(kafkaServer != null) { kafkaServer.shutdown(); kafkaServer.awaitShutdown(); @@ -219,6 +228,17 @@ public class KafkaComponent implements InMemoryComponent { } } + public void shutdownProducers() { + for(KafkaProducer kp : producersCreated) { + try { + kp.close(); + } + catch(Exception ex) { + LOG.error(ex.getMessage(), ex); + } + } + } + public void createTopic(String name) throws InterruptedException { createTopic(name, 1, true); } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java index 046bcff..c5428d2 100644 --- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java +++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java @@ -18,9 +18,13 @@ package org.apache.metron.management; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.metron.common.dsl.Context; @@ -29,14 +33,7 @@ import org.apache.metron.common.dsl.Stellar; import org.apache.metron.common.dsl.StellarFunction; import org.apache.metron.common.utils.ConversionUtils; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ExecutionException; import static org.apache.metron.common.dsl.Context.Capabilities.GLOBAL_CONFIG; @@ -128,6 +125,7 @@ public class KafkaFunctions { // build the properties for kafka Properties properties = buildKafkaProperties(overrides, context); + properties.put("max.poll.records", count); // read some messages try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-parsers/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 2cf9bbf..cc45834 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -296,24 +296,21 @@ usage: start_parser_topology.sh These options are intended to configure the Storm Kafka Spout more completely. These options can be specified in a JSON file containing a map associating the kafka spout configuration parameter to a value. The range of values possible to configure are: -* retryDelayMaxMs -* retryDelayMultiplier -* retryInitialDelayMs -* stateUpdateIntervalMs -* bufferSizeBytes -* fetchMaxWait -* fetchSizeBytes -* maxOffsetBehind -* metricsTimeBucketSizeInSecs -* socketTimeoutMs - -These are described in some detail [here](https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.4/bk_storm-user-guide/content/storm-kafka-api-ref.html). - -For instance, creating a JSON file which will set the `bufferSizeBytes` to 2MB and `retryDelayMaxMs` to 2000 would look like +* `spout.pollTimeoutMs` - Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s +* `spout.firstPollOffsetStrategy` - Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. One of + * `EARLIEST` + * `LATEST` + * `UNCOMMITTED_EARLIEST` - Last uncommitted and if offsets aren't found, defaults to earliest. NOTE: This is the default. + * `UNCOMMITTED_LATEST` - Last uncommitted and if offsets aren't found, defaults to latest. +* `spout.offsetCommitPeriodMs` - Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s. +* `spout.maxUncommittedOffsets` - Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number of pending offsets bellow the threshold. The default is 10,000,000. +* `spout.maxRetries` - Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that no new records are committed until the previous polled records have been acked. This guarantees at once delivery of all the previously polled records. By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous polled records in favor of processing more records. +* Any of the configs in the Consumer API for [Kafka 0.10.x](http://kafka.apache.org/0100/documentation.html#newconsumerconfigs) + +For instance, creating a JSON file which will set the offsets to `UNCOMMITTED_EARLIEST` ``` { - "bufferSizeBytes" : 2000000, - "retryDelayMaxMs" : 2000 + "spout.firstPollOffsetStrategy" : "UNCOMMITTED_EARLIEST" } ``` http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index aeac33c..b347ca5 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -17,6 +17,11 @@ */ package org.apache.metron.parsers.topology; +import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; +import org.apache.metron.storm.kafka.flux.SpoutConfiguration; +import org.apache.metron.storm.kafka.flux.StormKafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.TopologyBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.metron.common.Constants; @@ -26,8 +31,6 @@ import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.common.configuration.writer.ParserWriterConfiguration; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.MessageWriter; -import org.apache.metron.common.spout.kafka.SpoutConfig; -import org.apache.metron.common.spout.kafka.SpoutConfigOptions; import org.apache.metron.common.utils.ReflectionUtils; import org.apache.metron.parsers.bolt.ParserBolt; import org.apache.metron.parsers.bolt.WriterBolt; @@ -36,10 +39,8 @@ import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.writer.AbstractWriter; import org.apache.metron.writer.kafka.KafkaWriter; import org.json.simple.JSONObject; -import org.apache.storm.kafka.KafkaSpout; -import org.apache.storm.kafka.ZkHosts; -import java.util.EnumMap; +import java.util.*; /** * Builds a Storm topology that parses telemetry data received from a sensor. @@ -52,7 +53,6 @@ public class ParserTopologyBuilder { * @param zookeeperUrl Zookeeper URL * @param brokerUrl Kafka Broker URL * @param sensorType Type of sensor - * @param offset Kafka topic offset where the topology will start; BEGINNING, END, WHERE_I_LEFT_OFF * @param spoutParallelism Parallelism hint for the spout * @param spoutNumTasks Number of tasks for the spout * @param parserParallelism Parallelism hint for the parser bolt @@ -66,14 +66,13 @@ public class ParserTopologyBuilder { public static TopologyBuilder build(String zookeeperUrl, String brokerUrl, String sensorType, - SpoutConfig.Offset offset, int spoutParallelism, int spoutNumTasks, int parserParallelism, int parserNumTasks, int errorWriterParallelism, int errorWriterNumTasks, - EnumMap<SpoutConfigOptions, Object> kafkaSpoutConfig + Map<String, Object> kafkaSpoutConfig ) throws Exception { // fetch configuration from zookeeper @@ -82,7 +81,7 @@ public class ParserTopologyBuilder { // create the spout TopologyBuilder builder = new TopologyBuilder(); - KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, offset, kafkaSpoutConfig, parserConfig); + KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, Optional.ofNullable(kafkaSpoutConfig) , parserConfig); builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism) .setNumTasks(spoutNumTasks); @@ -106,19 +105,22 @@ public class ParserTopologyBuilder { /** * Create a spout that consumes tuples from a Kafka topic. * - * @param zookeeperUrl Zookeeper URL + * @param zkQuorum Zookeeper URL * @param sensorType Type of sensor - * @param offset Kafka topic offset where the topology will start; BEGINNING, END, WHERE_I_LEFT_OFF - * @param kafkaSpoutConfigOptions Configuration options for the kafka spout + * @param kafkaConfigOptional Configuration options for the kafka spout * @param parserConfig Configuration for the parser * @return */ - private static KafkaSpout createKafkaSpout(String zookeeperUrl, String sensorType, SpoutConfig.Offset offset, EnumMap<SpoutConfigOptions, Object> kafkaSpoutConfigOptions, SensorParserConfig parserConfig) { - + private static StormKafkaSpout<Object, Object> createKafkaSpout(String zkQuorum, String sensorType, Optional<Map<String, Object>> kafkaConfigOptional, SensorParserConfig parserConfig) { + Map<String, Object> kafkaSpoutConfigOptions = kafkaConfigOptional.orElse(new HashMap<>()); String inputTopic = parserConfig.getSensorTopic() != null ? parserConfig.getSensorTopic() : sensorType; - SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeeperUrl), inputTopic, "", inputTopic).from(offset); - SpoutConfigOptions.configure(spoutConfig, kafkaSpoutConfigOptions); - return new KafkaSpout(spoutConfig); + kafkaSpoutConfigOptions.putIfAbsent( SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key + , KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST.toString() + ); + kafkaSpoutConfigOptions.putIfAbsent( KafkaSpoutConfig.Consumer.GROUP_ID + , inputTopic + "_parser" + ); + return SimpleStormKafkaBuilder.create(inputTopic, zkQuorum, Arrays.asList("value"), kafkaSpoutConfigOptions); } /** http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java index 2bf484e..8cf921e 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java @@ -17,6 +17,7 @@ */ package org.apache.metron.parsers.topology; +import org.apache.metron.storm.kafka.flux.SpoutConfiguration; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; @@ -26,19 +27,15 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Joiner; import org.apache.commons.cli.*; import org.apache.commons.io.FileUtils; -import org.apache.metron.common.spout.kafka.SpoutConfig; -import org.apache.metron.common.spout.kafka.SpoutConfigOptions; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.parsers.topology.config.Arg; import org.apache.metron.parsers.topology.config.ConfigHandlers; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.EnumMap; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import java.util.function.Function; public class ParserTopologyCLI { @@ -156,7 +153,11 @@ public class ParserTopologyCLI { }, new ConfigHandlers.SetMessageTimeoutHandler() ) ,EXTRA_OPTIONS("e", code -> { - Option o = new Option(code, "extra_topology_options", true, "Extra options in the form of a JSON file with a map for content."); + Option o = new Option(code, "extra_topology_options", true + , "Extra options in the form of a JSON file with a map for content." + + " Available options are those in the Kafka Consumer Configs at http://kafka.apache.org/0100/documentation.html#newconsumerconfigs" + + " and " + Joiner.on(",").join(SpoutConfiguration.allOptions()) + ); o.setArgName("JSON_FILE"); o.setRequired(false); o.setType(String.class); @@ -167,8 +168,8 @@ public class ParserTopologyCLI { Option o = new Option(code , "extra_kafka_spout_config" , true - , "Extra spout config options in the form of a JSON file with a map for content. " + - "Possible keys are: " + Joiner.on(",").join(SpoutConfigOptions.values())); + , "Extra spout config options in the form of a JSON file with a map for content." + ); o.setArgName("JSON_FILE"); o.setRequired(false); o.setType(String.class); @@ -182,13 +183,6 @@ public class ParserTopologyCLI { o.setRequired(false); return o; }) - ,KAFKA_OFFSET("koff", code -> - { - Option o = new Option("koff", "kafka_offset", true, "Kafka offset"); - o.setArgName("BEGINNING|WHERE_I_LEFT_OFF"); - o.setRequired(false); - return o; - }) ; Option option; String shortCode; @@ -286,18 +280,14 @@ public class ParserTopologyCLI { int errorNumTasks= Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1")); int invalidParallelism = Integer.parseInt(ParserOptions.INVALID_WRITER_PARALLELISM.get(cmd, "1")); int invalidNumTasks= Integer.parseInt(ParserOptions.INVALID_WRITER_NUM_TASKS.get(cmd, "1")); - EnumMap<SpoutConfigOptions, Object> spoutConfig = new EnumMap<SpoutConfigOptions, Object>(SpoutConfigOptions.class); + Map<String, Object> spoutConfig = new HashMap<>(); if(ParserOptions.SPOUT_CONFIG.has(cmd)) { spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd))); } - SpoutConfig.Offset offset = cmd.hasOption("t") ? SpoutConfig.Offset.BEGINNING : SpoutConfig.Offset.WHERE_I_LEFT_OFF; - if(cmd.hasOption("koff")) { - offset = SpoutConfig.Offset.valueOf(cmd.getOptionValue("koff")); - } + TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl, brokerUrl, sensorType, - offset, spoutParallelism, spoutNumTasks, parserParallelism, @@ -322,7 +312,7 @@ public class ParserTopologyCLI { System.exit(-1); } } - private static EnumMap<SpoutConfigOptions, Object> readSpoutConfig(File inputFile) { + private static Map<String, Object> readSpoutConfig(File inputFile) { String json = null; if (inputFile.exists()) { try { @@ -335,8 +325,8 @@ public class ParserTopologyCLI { throw new IllegalArgumentException("Unable to load JSON file at " + inputFile.getAbsolutePath()); } try { - return SpoutConfigOptions.coerceMap(JSONUtils.INSTANCE.load(json, new TypeReference<Map<String, Object>>() { - })); + return JSONUtils.INSTANCE.load(json, new TypeReference<Map<String, Object>>() { + }); } catch (IOException e) { throw new IllegalStateException("Unable to process JSON.", e); } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java index 73d3827..48bcbec 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java @@ -17,24 +17,36 @@ */ package org.apache.metron.parsers.integration.components; +import com.google.common.collect.ImmutableMap; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; -import org.apache.metron.common.spout.kafka.SpoutConfig; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.parsers.topology.ParserTopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitOption; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots; +import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir; + public class ParserTopologyComponent implements InMemoryComponent { + protected static final Logger LOG = LoggerFactory.getLogger(ParserTopologyComponent.class); private Properties topologyProperties; private String brokerUrl; private String sensorType; - private SpoutConfig.Offset offset = SpoutConfig.Offset.BEGINNING; private LocalCluster stormCluster; public static class Builder { @@ -65,9 +77,6 @@ public class ParserTopologyComponent implements InMemoryComponent { this.sensorType = sensorType; } - public void setOffset(SpoutConfig.Offset offset) { - this.offset = offset; - } @Override public void start() throws UnableToStartException { @@ -75,7 +84,6 @@ public class ParserTopologyComponent implements InMemoryComponent { TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty("kafka.zk") , brokerUrl , sensorType - , offset , 1 , 1 , 1 @@ -95,8 +103,29 @@ public class ParserTopologyComponent implements InMemoryComponent { @Override public void stop() { - if(stormCluster != null) { - stormCluster.shutdown(); + if (stormCluster != null) { + try { + try { + stormCluster.shutdown(); + } catch (IllegalStateException ise) { + if (!(ise.getMessage().contains("It took over") && ise.getMessage().contains("to shut down slot"))) { + throw ise; + } + else { + assassinateSlots(); + LOG.error("Storm slots didn't shut down entirely cleanly *sigh*. " + + "I gave them the old one-two-skadoo and killed the slots with prejudice. " + + "If tests fail, we'll have to find a better way of killing them.", ise); + } + } + } + catch(Throwable t) { + LOG.error(t.getMessage(), t); + } + finally { + cleanupWorkerDir(); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java index bb3cab6..5e70177 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java @@ -85,12 +85,10 @@ public class ParserTopologyCLITest { CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor") - .with(ParserTopologyCLI.ParserOptions.KAFKA_OFFSET, "BEGINNING") .build(longOpt); Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli)); Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli)); Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli)); - Assert.assertEquals("BEGINNING", ParserTopologyCLI.ParserOptions.KAFKA_OFFSET.get(cli)); } @Test public void testCLI_happyPath() throws ParseException { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml index 95dbdb2..704493e 100644 --- a/metron-platform/metron-pcap-backend/pom.xml +++ b/metron-platform/metron-pcap-backend/pom.xml @@ -44,11 +44,6 @@ <version>${global_flux_version}</version> </dependency> <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-kafka</artifactId> - <version>${global_storm_version}</version> - </dependency> - <dependency> <groupId>org.apache.metron</groupId> <artifactId>metron-common</artifactId> <version>${project.parent.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml index 272ac13..732991b 100644 --- a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml +++ b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml @@ -19,29 +19,35 @@ config: topology.workers: 1 components: - - id: "zkHosts" - className: "org.apache.storm.kafka.ZkHosts" - constructorArgs: - - "${kafka.zk}" + + # Any kafka props for the producer go here. + - id: "kafkaProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "value.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "key.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "group.id" + - "pcap" - id: "kafkaConfig" - className: "org.apache.metron.spout.pcap.SpoutConfig" + className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" constructorArgs: - # zookeeper hosts - - ref: "zkHosts" + - ref: "kafkaProps" # topic name - "${spout.kafka.topic.pcap}" - # zk root - - "" - # id - - "${spout.kafka.topic.pcap}" + - "${kafka.zk}" configMethods: - - name: "from" - args: - - "${kafka.pcap.start}" - - name: "withTimestampScheme" + - name: "setFirstPollOffsetStrategy" args: - - "${kafka.pcap.ts_scheme}" - - "${kafka.pcap.ts_granularity}" + # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST + - "UNCOMMITTED_EARLIEST" - id: "writerConfig" className: "org.apache.metron.spout.pcap.HDFSWriterConfig" @@ -58,6 +64,10 @@ components: - name: "withZookeeperQuorum" args: - "${kafka.zk}" + - name: "withDeserializer" + args: + - "${kafka.pcap.ts_scheme}" + - "${kafka.pcap.ts_granularity}" spouts: - id: "kafkaSpout" className: "org.apache.metron.spout.pcap.KafkaToHDFSSpout"