http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java index d9818b7..6e59d42 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java @@ -18,32 +18,32 @@ package org.apache.storm.kafka.spout.test; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.ByTopicRecordTranslator; +import org.apache.storm.kafka.spout.Func; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; import org.apache.storm.kafka.spout.KafkaSpoutRetryService; -import org.apache.storm.kafka.spout.KafkaSpoutStreams; -import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import org.apache.storm.tuple.Values; public class KafkaSpoutTopologyMainNamedTopics { - private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"}; + private static final String TOPIC_2_STREAM = "test_2_stream"; + private static final String TOPIC_0_1_STREAM = "test_0_1_stream"; private static final String[] TOPICS = new String[]{"test","test1","test2"}; @@ -88,14 +88,32 @@ public class KafkaSpoutTopologyMainNamedTopics { protected StormTopology getTopolgyKafkaSpout() { final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1); - tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]); - tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]); + tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1); + tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()) + .shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM) + .shuffleGrouping("kafka_spout", TOPIC_2_STREAM); + tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM); return tp.createTopology(); } - protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { - return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService()) + public static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_PART_OFF_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() { + @Override + public List<Object> apply(ConsumerRecord<String, String> r) { + return new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()); + } + }; + + protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() { + ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>( + TOPIC_PART_OFF_KEY_VALUE_FUNC, + new Fields("topic", "partition", "offset", "key", "value"), TOPIC_0_1_STREAM); + trans.forTopic(TOPICS[2], + TOPIC_PART_OFF_KEY_VALUE_FUNC, + new Fields("topic", "partition", "offset", "key", "value"), TOPIC_2_STREAM); + return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPICS) + .setGroupId("kafkaSpoutTestGroup") + .setRetry(getRetryService()) + .setRecordTranslator(trans) .setOffsetCommitPeriodMs(10_000) .setFirstPollOffsetStrategy(EARLIEST) .setMaxUncommittedOffsets(250) @@ -106,30 +124,4 @@ public class KafkaSpoutTopologyMainNamedTopics { return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); } - - protected Map<String,Object> getKafkaConsumerProps() { - Map<String, Object> props = new HashMap<>(); -// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); - props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092"); - props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); - props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); - return props; - } - - protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { - return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( - new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]), - new TopicTest2TupleBuilder<String, String>(TOPICS[2])) - .build(); - } - - protected KafkaSpoutStreams getKafkaSpoutStreams() { - final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); - final Fields outputFields1 = new Fields("topic", "partition", "offset"); - return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream - .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream - .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream - .build(); - } }
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java index c362a2b..8b967fa 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java @@ -18,22 +18,23 @@ package org.apache.storm.kafka.spout.test; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.Func; import org.apache.storm.kafka.spout.KafkaSpout; -import org.apache.storm.kafka.spout.KafkaSpoutStream; -import org.apache.storm.kafka.spout.KafkaSpoutStreams; -import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics; -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderWildcardTopics; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; - -import java.util.regex.Pattern; +import org.apache.storm.tuple.Values; public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMainNamedTopics { private static final String STREAM = "test_wildcard_stream"; - private static final String TOPIC_WILDCARD_PATTERN = "test[1|2]"; + private static final Pattern TOPIC_WILDCARD_PATTERN = Pattern.compile("test[1|2]"); public static void main(String[] args) throws Exception { new KafkaSpoutTopologyMainWildcardTopics().runMain(args); @@ -41,22 +42,27 @@ public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMain protected StormTopology getTopolgyKafkaSpout() { final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1); + tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1); tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM); return tp.createTopology(); } - protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { - return new KafkaSpoutTuplesBuilderWildcardTopics<>(getTupleBuilder()); - } - - protected KafkaSpoutTupleBuilder<String, String> getTupleBuilder() { - return new TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN); - } - - protected KafkaSpoutStreams getKafkaSpoutStreams() { - final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); - final KafkaSpoutStream kafkaSpoutStream = new KafkaSpoutStream(outputFields, STREAM, Pattern.compile(TOPIC_WILDCARD_PATTERN)); - return new KafkaSpoutStreamsWildcardTopics(kafkaSpoutStream); + public static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_PART_OFF_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() { + @Override + public List<Object> apply(ConsumerRecord<String, String> r) { + return new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()); + } + }; + + protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() { + return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_WILDCARD_PATTERN) + .setGroupId("kafkaSpoutTestGroup") + .setRetry(getRetryService()) + .setRecordTranslator(TOPIC_PART_OFF_KEY_VALUE_FUNC, + new Fields("topic", "partition", "offset", "key", "value"), STREAM) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .build(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java deleted file mode 100644 index ca65177..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout.test; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; -import org.apache.storm.tuple.Values; - -import java.util.List; - -public class TopicTest2TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { - /** - * @param topics list of topics that use this implementation to build tuples - */ - public TopicTest2TupleBuilder(String... topics) { - super(topics); - } - - @Override - public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { - return new Values(consumerRecord.topic(), - consumerRecord.partition(), - consumerRecord.offset()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java deleted file mode 100644 index 4c55aa1..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout.test; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; -import org.apache.storm.tuple.Values; - -import java.util.List; - -public class TopicsTest0Test1TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { - /** - * @param topics list of topics that use this implementation to build tuples - */ - public TopicsTest0Test1TupleBuilder(String... topics) { - super(topics); - } - - @Override - public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { - return new Values(consumerRecord.topic(), - consumerRecord.partition(), - consumerRecord.offset(), - consumerRecord.key(), - consumerRecord.value()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java index 19f0452..4c5dba5 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java @@ -41,7 +41,7 @@ public class DynamicPartitionConnections { } } - Map<Broker, ConnectionInfo> _connections = new HashMap(); + Map<Broker, ConnectionInfo> _connections = new HashMap<>(); KafkaConfig _config; IBrokerReader _reader; http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java index d2bd313..4608963 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java @@ -61,7 +61,7 @@ public class KafkaSpout extends BaseRichSpout { public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { _collector = collector; String topologyInstanceId = context.getStormId(); - Map stateConf = new HashMap(conf); + Map<String, Object> stateConf = new HashMap<>(conf); List<String> zkServers = _spoutConfig.zkServers; if (zkServers == null) { zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index 8c22118..f23c873 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -110,7 +110,7 @@ public class KafkaUtils { @Override public Object getValueAndReset() { try { - HashMap ret = new HashMap(); + HashMap<String, Long> ret = new HashMap<>(); if (_partitions != null && _partitions.size() == _partitionToOffset.size()) { Map<String,TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>(); for (Map.Entry<Partition, PartitionManager.OffsetData> e : _partitionToOffset.entrySet()) { http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index 79e7c3d..db5558d 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -133,7 +133,7 @@ public class PartitionManager { } public Map getMetricsDataMap() { - Map ret = new HashMap(); + Map<String, Object> ret = new HashMap<>(); ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java index bdbc44d..628bfc0 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java @@ -24,7 +24,7 @@ import java.util.*; public class StaticCoordinator implements PartitionCoordinator { Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>(); - List<PartitionManager> _allManagers = new ArrayList(); + List<PartitionManager> _allManagers = new ArrayList<>(); public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { StaticHosts hosts = (StaticHosts) config.hosts; @@ -34,7 +34,7 @@ public class StaticCoordinator implements PartitionCoordinator { for (Partition myPartition : myPartitions) { _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition)); } - _allManagers = new ArrayList(_managers.values()); + _allManagers = new ArrayList<>(_managers.values()); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java index 31eaac5..b5bb124 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java @@ -51,7 +51,9 @@ import java.util.Properties; * This bolt uses 0.8.2 Kafka Producer API. * <p/> * It works for sending tuples to older Kafka version (0.8.1). + * @deprecated Please use the KafkaBolt in storm-kafka-client */ +@Deprecated public class KafkaBolt<K, V> extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java index 3363252..46cc60d 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java @@ -54,7 +54,7 @@ public class DynamicBrokersReaderTest { public void setUp() throws Exception { server = new TestingServer(); String connectionString = server.getConnectString(); - Map conf = new HashMap(); + Map<String, Object> conf = new HashMap<>(); conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000); conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000); conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4); @@ -64,7 +64,7 @@ public class DynamicBrokersReaderTest { zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy); dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic); - Map conf2 = new HashMap(); + Map<String, Object> conf2 = new HashMap<>(); conf2.putAll(conf); conf2.put("kafka.topic.wildcard.match",true); @@ -240,7 +240,7 @@ public class DynamicBrokersReaderTest { @Test(expected = NullPointerException.class) public void testErrorLogsWhenConfigIsMissing() throws Exception { String connectionString = server.getConnectString(); - Map conf = new HashMap(); + Map<String, Object> conf = new HashMap<>(); conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000); // conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000); conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4); http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java index 7a6073a..864eaa9 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java @@ -42,7 +42,7 @@ public class TridentKafkaTest { public void setup() { broker = new KafkaTestBroker(); simpleConsumer = TestUtils.getKafkaConsumer(broker); - TridentTupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper("key", "message"); + TridentTupleToKafkaMapper<Object, Object> mapper = new FieldNameBasedTupleToKafkaMapper<Object, Object>("key", "message"); KafkaTopicSelector topicSelector = new DefaultTopicSelector(TestUtils.TOPIC); state = new TridentKafkaState() .withKafkaTopicSelector(topicSelector) http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index 65bf0b4..364da33 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -124,7 +124,7 @@ public class ZkCoordinatorTest { } private List<List<PartitionManager>> getPartitionManagers(List<ZkCoordinator> coordinatorList) { - List<List<PartitionManager>> partitions = new ArrayList(); + List<List<PartitionManager>> partitions = new ArrayList<>(); for (ZkCoordinator coordinator : coordinatorList) { partitions.add(coordinator.getMyManagedPartitions()); } http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java index 180828e..58e52e8 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java @@ -52,6 +52,7 @@ import org.apache.storm.kafka.trident.GlobalPartitionInformation; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import java.util.HashMap; import java.util.Properties; import java.util.concurrent.Future; @@ -287,7 +288,8 @@ public class KafkaBoltTest { private Tuple generateTestTuple(Object key, Object message) { TopologyBuilder builder = new TopologyBuilder(); - GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<Integer,String>(), + new HashMap<String, List<Integer>>(), new HashMap<String, Map<String, Fields>>(), "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { return new Fields("key", "message"); @@ -298,7 +300,8 @@ public class KafkaBoltTest { private Tuple generateTestTuple(Object message) { TopologyBuilder builder = new TopologyBuilder(); - GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<Integer, String>(), + new HashMap<String, List<Integer>>(), new HashMap<String, Map<String, Fields>>(), "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { return new Fields("message"); http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/storm-core/src/jvm/org/apache/storm/tuple/Fields.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/tuple/Fields.java b/storm-core/src/jvm/org/apache/storm/tuple/Fields.java index 840b2d3..a771748 100644 --- a/storm-core/src/jvm/org/apache/storm/tuple/Fields.java +++ b/storm-core/src/jvm/org/apache/storm/tuple/Fields.java @@ -29,6 +29,7 @@ import java.io.Serializable; * Collection of unique named fields using in an ITuple */ public class Fields implements Iterable<String>, Serializable { + private static final long serialVersionUID = -3377931843059975424L; private List<String> _fields; private Map<String, Integer> _index = new HashMap<>(); @@ -122,5 +123,20 @@ public class Fields implements Iterable<String>, Serializable { @Override public String toString() { return _fields.toString(); - } + } + + @Override + public boolean equals(Object other) { + if (this == other) return true; + if (other instanceof Fields) { + Fields of = (Fields)other; + return _fields.equals(of._fields); + } + return false; + } + + @Override + public int hashCode() { + return _fields.hashCode(); + } }