Repository: metron Updated Branches: refs/heads/master 1277b6c32 -> 6c836d136
METRON-950: Migrate storm-kafka-client to 1.1 closes apache/incubator-metron#584 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/6c836d13 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/6c836d13 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/6c836d13 Branch: refs/heads/master Commit: 6c836d13635fcf1639ea7b8adbaf1ba4051c4f1c Parents: 1277b6c Author: cstella <[email protected]> Authored: Tue May 16 11:02:43 2017 -0400 Committer: cstella <[email protected]> Committed: Tue May 16 11:02:43 2017 -0400 ---------------------------------------------------------------------- .../parsers/topology/ParserTopologyBuilder.java | 3 +- .../metron-storm-kafka-override/pom.xml | 101 +++++++++++++++++++ .../storm/kafka/spout/internal/Timer.java | 58 +++++++++++ .../storm/kafka/spout/internal/TimerTest.java | 36 +++++++ metron-platform/metron-storm-kafka/pom.xml | 5 + .../kafka/flux/SimpleStormKafkaBuilder.java | 90 +++++++++-------- .../storm/kafka/flux/SpoutConfiguration.java | 20 ++-- .../kafka/flux/SpoutConfigurationTest.java | 13 +-- metron-platform/pom.xml | 1 + pom.xml | 42 ++++---- 10 files changed, 289 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index e9acbaa..196c19d 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -17,6 +17,7 @@ */ package org.apache.metron.parsers.topology; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; import org.apache.metron.storm.kafka.flux.StormKafkaSpout; @@ -124,7 +125,7 @@ public class ParserTopologyBuilder { kafkaSpoutConfigOptions.putIfAbsent( SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key , KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST.toString() ); - kafkaSpoutConfigOptions.putIfAbsent( KafkaSpoutConfig.Consumer.GROUP_ID + kafkaSpoutConfigOptions.putIfAbsent( ConsumerConfig.GROUP_ID_CONFIG , inputTopic + "_parser" ); if(securityProtocol.isPresent()) { http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka-override/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka-override/pom.xml b/metron-platform/metron-storm-kafka-override/pom.xml new file mode 100644 index 0000000..8683176 --- /dev/null +++ b/metron-platform/metron-storm-kafka-override/pom.xml @@ -0,0 +1,101 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.metron</groupId> + <artifactId>metron-platform</artifactId> + <version>0.4.0</version> + </parent> + <artifactId>metron-storm-kafka-override</artifactId> + <name>metron-storm-kafka-override</name> + <description>Components that extend the Storm/Kafka spout</description> + <url>https://metron.apache.org/</url> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <commons.config.version>1.10</commons.config.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-kafka-client</artifactId> + <version>${global_storm_kafka_version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${global_kafka_version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${global_storm_version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-slf4j-impl</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${global_kafka_version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-common</artifactId> + <version>${project.parent.version}</version> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + </build> +</project> http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java new file mode 100644 index 0000000..f9782ab --- /dev/null +++ b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.internal; + +import java.util.concurrent.TimeUnit; +import org.apache.storm.utils.Time; + +public class Timer { + private final long delay; + private final long period; + private final TimeUnit timeUnit; + private final long periodNanos; + private long start; + + public Timer(long delay, long period, TimeUnit timeUnit) { + this.delay = delay; + this.period = period; + this.timeUnit = timeUnit; + this.periodNanos = timeUnit.toNanos(period); + this.start = System.nanoTime() + timeUnit.toNanos(delay); + } + + public long period() { + return this.period; + } + + public long delay() { + return this.delay; + } + + public TimeUnit getTimeUnit() { + return this.timeUnit; + } + + public boolean isExpiredResetOnTrue() { + boolean expired = System.nanoTime() - this.start >= this.periodNanos; + if(expired) { + this.start = System.nanoTime(); + } + + return expired; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java b/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java new file mode 100644 index 0000000..0d49ae1 --- /dev/null +++ b/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.internal; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +public class TimerTest { + + @Test + public void testReset() throws InterruptedException { + Timer t = new Timer(0, 2, TimeUnit.SECONDS); + Thread.sleep(1000); + Assert.assertFalse(t.isExpiredResetOnTrue()); + Thread.sleep(1000); + Assert.assertTrue(t.isExpiredResetOnTrue()); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/pom.xml b/metron-platform/metron-storm-kafka/pom.xml index b8e3f8d..5c28b34 100644 --- a/metron-platform/metron-storm-kafka/pom.xml +++ b/metron-platform/metron-storm-kafka/pom.xml @@ -31,6 +31,11 @@ </properties> <dependencies> <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-storm-kafka-override</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>${global_storm_kafka_version}</version> http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java index bf5250b..592859e 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java @@ -20,8 +20,10 @@ package org.apache.metron.storm.kafka.flux; import com.google.common.base.Joiner; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.metron.common.utils.KafkaUtils; import org.apache.storm.kafka.spout.*; import org.apache.storm.spout.SpoutOutputCollector; @@ -30,10 +32,7 @@ import org.apache.storm.topology.OutputFieldsGetter; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Function; /** @@ -47,7 +46,6 @@ import java.util.function.Function; * @param <V> The kafka value type */ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V> { - final static String STREAM = "default"; /** * The fields exposed by the kafka consumer. These will show up in the Storm tuple. @@ -113,11 +111,12 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V * @param <K> The key type in kafka * @param <V> The value type in kafka */ - public static class TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { + public static class SpoutRecordTranslator<K, V> implements RecordTranslator<K,V> { private List<FieldsConfiguration> configurations; - private TupleBuilder(String topic, List<FieldsConfiguration> configurations) { - super(topic); + private Fields fields; + private SpoutRecordTranslator(List<FieldsConfiguration> configurations) { this.configurations = configurations; + this.fields = FieldsConfiguration.getFields(configurations); } /** @@ -127,15 +126,27 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V * @return list of tuples */ @Override - public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { + public List<Object> apply(ConsumerRecord<K, V> consumerRecord) { Values ret = new Values(); for(FieldsConfiguration config : configurations) { ret.add(config.recordExtractor.apply(consumerRecord)); } return ret; } + + @Override + public Fields getFieldsFor(String s) { + return fields; + } + + @Override + public List<String> streams() { + return DEFAULT_STREAM; + } } + public static String DEFAULT_DESERIALIZER = ByteArrayDeserializer.class.getName(); + private String topic; /** @@ -165,13 +176,39 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V , List<String> fieldsConfiguration ) { - super( modifyKafkaProps(kafkaProps, zkQuorum) - , createStreams(fieldsConfiguration, topic) - , createTuplesBuilder(fieldsConfiguration, topic) - ); + super( getBootstrapServers(zkQuorum, kafkaProps) + , createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)), DEFAULT_DESERIALIZER) + , createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)), DEFAULT_DESERIALIZER) + , topic + ); + setProp(kafkaProps); + setRecordTranslator(new SpoutRecordTranslator<>(FieldsConfiguration.toList(fieldsConfiguration))); this.topic = topic; } + private static <T> Class<Deserializer<T>> createDeserializer( Optional<String> deserializerClass + , String defaultDeserializerClass + ) + { + try { + return (Class<Deserializer<T>>) Class.forName(deserializerClass.orElse(defaultDeserializerClass)); + } catch (Exception e) { + throw new IllegalStateException("Unable to create a deserializer: " + deserializerClass.orElse(defaultDeserializerClass) + ": " + e.getMessage(), e); + } + } + + private static String getBootstrapServers(String zkQuorum, Map<String, Object> kafkaProps) { + String brokers = (String)kafkaProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + if(brokers == null) { + try { + return Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum)); + } catch (Exception e) { + throw new IllegalStateException("Unable to find the bootstrap servers: " + e.getMessage(), e); + } + } + return brokers; + } + /** * Get the kafka topic. TODO: In the future, support multiple topics and regex patterns. * @return @@ -202,31 +239,4 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V return new StormKafkaSpout<>(builder); } - private static Map<String, Object> modifyKafkaProps(Map<String, Object> props, String zkQuorum) { - try { - if(!props.containsKey(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS)) { - //this isn't a putIfAbsent because I only want to pull the brokers from zk if it's absent. - List<String> brokers = KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum); - props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, Joiner.on(",").join(brokers)); - } - props.putIfAbsent(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, ByteArrayDeserializer.class.getName()); - props.putIfAbsent(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName()); - - } catch (Exception e) { - throw new IllegalStateException("Unable to retrieve brokers from zookeeper: " + e.getMessage(), e); - } - return props; - } - - private static <K,V> KafkaSpoutTuplesBuilder<K, V> createTuplesBuilder(List<String> config, String topic) { - TupleBuilder<K, V> tb = new TupleBuilder<K, V>(topic, FieldsConfiguration.toList(config)); - return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(tb).build(); - } - - - private static KafkaSpoutStreams createStreams(List<String> config, String topic) { - final Fields fields = FieldsConfiguration.getFields(FieldsConfiguration.toList(config)); - return new KafkaSpoutStreamsNamedTopics.Builder(fields, STREAM, new String[] { topic} ).build(); - } - } http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java index 6c0f148..2a4586d 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java @@ -17,6 +17,8 @@ */ package org.apache.metron.storm.kafka.flux; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.metron.common.utils.ConversionUtils; import org.apache.storm.kafka.spout.KafkaSpoutConfig; @@ -49,12 +51,6 @@ public enum SpoutConfiguration { , container -> container.builder.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.valueOf(container.value.toString())) ) /** - * The maximum number of retries - */ - ,MAX_RETRIES("spout.maxRetries" - , container -> container.builder.setMaxRetries(ConversionUtils.convert(container.value, Integer.class)) - ) - /** * The maximum amount of uncommitted offsets */ ,MAX_UNCOMMITTED_OFFSETS("spout.maxUncommittedOffsets" @@ -66,6 +62,12 @@ public enum SpoutConfiguration { ,OFFSET_COMMIT_PERIOD_MS("spout.offsetCommitPeriodMs" , container -> container.builder.setOffsetCommitPeriodMs(ConversionUtils.convert(container.value, Long.class)) ) + /** + * The partition refresh period in milliseconds + */ + ,PARTITION_REFRESH_PERIOD_MS("spout.partitionRefreshPeriodMs" + , container -> container.builder.setPartitionRefreshPeriodMs(ConversionUtils.convert(container.value, Long.class)) + ) ; private static class Container { Map<String, Object> config; @@ -131,9 +133,9 @@ public enum SpoutConfiguration { for(SpoutConfiguration spoutConfig : SpoutConfiguration.values()) { ret.add(spoutConfig.key); } - ret.add(KafkaSpoutConfig.Consumer.GROUP_ID); - ret.add(KafkaSpoutConfig.Consumer.AUTO_COMMIT_INTERVAL_MS); - ret.add(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT); + ret.add(ConsumerConfig.GROUP_ID_CONFIG); + ret.add(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + ret.add(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); return ret; } } http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java b/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java index fdef69d..c6dbd8f 100644 --- a/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java +++ b/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java @@ -17,6 +17,7 @@ */ package org.apache.metron.storm.kafka.flux; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.metron.common.utils.KafkaUtils; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.junit.Assert; @@ -33,14 +34,14 @@ public class SpoutConfigurationTest { public void testSeparation() { Map<String, Object> config = new HashMap<String, Object>() {{ put(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key, "UNCOMMITTED_EARLIEST"); - put(SpoutConfiguration.MAX_RETRIES.key, "1000"); + put(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key, "1000"); put("group.id", "foobar"); }}; Map<String, Object> spoutConfig = SpoutConfiguration.separate(config); Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key)); Assert.assertEquals(spoutConfig.get(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key), "UNCOMMITTED_EARLIEST"); - Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.MAX_RETRIES.key)); - Assert.assertEquals(spoutConfig.get(SpoutConfiguration.MAX_RETRIES.key), "1000"); + Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key)); + Assert.assertEquals(spoutConfig.get(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key), "1000"); Assert.assertEquals(2, spoutConfig.size()); Assert.assertEquals(1, config.size()); Assert.assertEquals(config.get("group.id"), "foobar"); @@ -49,15 +50,15 @@ public class SpoutConfigurationTest { @Test public void testBuilderCreation() { Map<String, Object> config = new HashMap<String, Object>() {{ - put(SpoutConfiguration.MAX_RETRIES.key, "1000"); - put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "foo:1234"); + put(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key, "1000"); + put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234"); put("group.id", "foobar"); }}; Map<String, Object> spoutConfig = SpoutConfiguration.separate(config); KafkaSpoutConfig.Builder<Object, Object> builder = new SimpleStormKafkaBuilder(config, "topic", null); SpoutConfiguration.configure(builder, spoutConfig); KafkaSpoutConfig c = builder.build(); - Assert.assertEquals(1000, c.getMaxTupleRetries() ); + Assert.assertEquals(1000, c.getOffsetsCommitPeriodMs() ); } } http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml index 1376e5c..cc48851 100644 --- a/metron-platform/pom.xml +++ b/metron-platform/pom.xml @@ -58,6 +58,7 @@ <module>elasticsearch-shaded</module> <module>metron-elasticsearch</module> <module>metron-storm-kafka</module> + <module>metron-storm-kafka-override</module> </modules> <dependencies> <dependency> http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index bb32c64..9f62249 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,22 @@ <repositories> <repository> + <releases> + <enabled>true</enabled> + <updatePolicy>always</updatePolicy> + <checksumPolicy>warn</checksumPolicy> + </releases> + <snapshots> + <enabled>true</enabled> + <updatePolicy>never</updatePolicy> + <checksumPolicy>warn</checksumPolicy> + </snapshots> + <id>HDPPrivateReleases</id> + <name>HDP Private Releases</name> + <url>http://nexus-private.hortonworks.com/nexus/content/groups/public</url> + <layout>default</layout> + </repository> + <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> @@ -83,30 +99,7 @@ <global_curator_version>2.7.1</global_curator_version> <global_classindex_version>3.3</global_classindex_version> <global_storm_version>1.0.3</global_storm_version> - <!-- - This bears some explanation. storm-kafka-client is our kafka spout. - If we ever hope to support kerberos, this provides the capability to do so - in apache. Unfortunately, it also does not support, as of Storm 1.0.x - Kafka 0.10.x (see https://issues.apache.org/jira/browse/STORM-2091). - The consumer libraries (not to be confused with the protocol) on the JVM - are binary incompatible. Note the discussion on https://issues.apache.org/jira/browse/KAFKA-3006, - the main issue is the move to Collection over List. While this would be polymorphically - a non-issue, it would require a recompile of storm-kafka-client against Kafka 0.10.x. - - Since a targeted platform is HDP 2.5.x, which ships only kafka 0.10.x, we need - to support kafka 0.10.x. Therefore, if we are to use this, then we would need - to support both Kafka 0.9.x and 0.10.x. Unfortunately, this would require us - to fork some of the internal projects because the 0.9.x API has shifted - (e.g. the Admin functions have different parameters) and behaves - differently than 0.10.x in subtle ways (e.g. KAFKA_GET doesn't work as implemented). - - Rather than do this, we chose to depend on the HDP version of storm-kafka because - it is compiled against 0.10.x and therefore would allow us to not fork our support - for kafka. I do not like this bleeding of the HDP profile dependency into the default, - but I justify it by noting that this should be able to be removed when we migrate to - Storm 1.1.x, which properly supports Kafka 0.10.x. - --> - <global_storm_kafka_version>1.0.1.2.5.0.0-1245</global_storm_kafka_version> + <global_storm_kafka_version>1.1.0</global_storm_kafka_version> <global_flux_version>${base_flux_version}</global_flux_version> <global_pcap_version>1.7.1</global_pcap_version> <global_kafka_version>0.10.0.1</global_kafka_version> @@ -138,6 +131,7 @@ <properties> <hdp_version>2.5.0.0</hdp_version> <build_number>1245</build_number> + <global_storm_kafka_version>1.1.0.2.6.1.0-SNAPSHOT</global_storm_kafka_version> <global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version> <global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version> </properties>
