http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java deleted file mode 100644 index 04e1396..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -public class DefaultCoordinator implements IBatchCoordinator { - - @Override - public boolean isReady(long txid) { - return true; - } - - @Override - public void close() { - } - -}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java deleted file mode 100644 index b0d97fc..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import com.google.common.base.Objects; -import storm.kafka.Broker; -import storm.kafka.Partition; - -import java.io.Serializable; -import java.util.*; - - -public class GlobalPartitionInformation implements Iterable<Partition>, Serializable { - - private Map<Integer, Broker> partitionMap; - public String topic; - - //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition - private Boolean bUseTopicNameForPartitionPathId; - - public GlobalPartitionInformation(String topic, Boolean bUseTopicNameForPartitionPathId) { - this.topic = topic; - this.partitionMap = new TreeMap<Integer, Broker>(); - this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId; - } - - public GlobalPartitionInformation(String topic) { - this.topic = topic; - this.partitionMap = new TreeMap<Integer, Broker>(); - this.bUseTopicNameForPartitionPathId = false; - } - - public void addPartition(int partitionId, Broker broker) { - partitionMap.put(partitionId, broker); - } - - @Override - public String toString() { - return "GlobalPartitionInformation{" + - "topic=" + topic + - ", partitionMap=" + partitionMap + - '}'; - } - - public Broker getBrokerFor(Integer partitionId) { - return partitionMap.get(partitionId); - } - - public List<Partition> getOrderedPartitions() { - List<Partition> partitions = new LinkedList<Partition>(); - for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) { - partitions.add(new Partition(partition.getValue(), this.topic, partition.getKey(), this.bUseTopicNameForPartitionPathId)); - } - return partitions; - } - - @Override - public Iterator<Partition> iterator() { - final Iterator<Map.Entry<Integer, Broker>> iterator = partitionMap.entrySet().iterator(); - final String topic = this.topic; - final Boolean bUseTopicNameForPartitionPathId = this.bUseTopicNameForPartitionPathId; - return new Iterator<Partition>() { - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public Partition next() { - Map.Entry<Integer, Broker> next = iterator.next(); - return new Partition(next.getValue(), topic , next.getKey(), bUseTopicNameForPartitionPathId); - } - - @Override - public void remove() { - iterator.remove(); - } - }; - } - - @Override - public int hashCode() { - return Objects.hashCode(partitionMap); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - final GlobalPartitionInformation other = (GlobalPartitionInformation) obj; - return Objects.equal(this.partitionMap, other.partitionMap); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java deleted file mode 100644 index 04231f4..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import java.io.Serializable; - -public interface IBatchCoordinator extends Serializable { - boolean isReady(long txid); - - void close(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java deleted file mode 100644 index afba659..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import java.util.List; -import java.util.Map; - -public interface IBrokerReader { - - GlobalPartitionInformation getBrokerForTopic(String topic); - - List<GlobalPartitionInformation> getAllBrokers(); - - void close(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java b/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java deleted file mode 100644 index 60d7c7b..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - - -import backtype.storm.metric.api.ICombiner; - -public class MaxMetric implements ICombiner<Long> { - @Override - public Long identity() { - return null; - } - - @Override - public Long combine(Long l1, Long l2) { - if (l1 == null) { - return l2; - } - if (l2 == null) { - return l1; - } - return Math.max(l1, l2); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java deleted file mode 100644 index fbd1d7a..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Fields; -import storm.kafka.Partition; -import storm.trident.spout.IOpaquePartitionedTridentSpout; - -import java.util.List; -import java.util.Map; -import java.util.UUID; - - -public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<List<GlobalPartitionInformation>, Partition, Map> { - - - TridentKafkaConfig _config; - - public OpaqueTridentKafkaSpout(TridentKafkaConfig config) { - _config = config; - } - - @Override - public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map conf, TopologyContext context) { - return new TridentKafkaEmitter(conf, context, _config, context - .getStormId()).asOpaqueEmitter(); - } - - @Override - public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext tc) { - return new storm.kafka.trident.Coordinator(conf, _config); - } - - @Override - public Fields getOutputFields() { - return _config.scheme.getOutputFields(); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java deleted file mode 100644 index ca83c06..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -public class StaticBrokerReader implements IBrokerReader { - - private Map<String,GlobalPartitionInformation> brokers = new TreeMap<String,GlobalPartitionInformation>(); - - public StaticBrokerReader(String topic, GlobalPartitionInformation partitionInformation) { - this.brokers.put(topic, partitionInformation); - } - - @Override - public GlobalPartitionInformation getBrokerForTopic(String topic) { - if (brokers.containsKey(topic)) return brokers.get(topic); - return null; - } - - @Override - public List<GlobalPartitionInformation> getAllBrokers () { - List<GlobalPartitionInformation> list = new ArrayList<GlobalPartitionInformation>(); - list.addAll(brokers.values()); - return list; - } - - @Override - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java deleted file mode 100644 index 9feffc8..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Fields; -import storm.kafka.Partition; -import storm.trident.spout.IPartitionedTridentSpout; - -import java.util.Map; -import java.util.UUID; - - -public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> { - - TridentKafkaConfig _config; - - public TransactionalTridentKafkaSpout(TridentKafkaConfig config) { - _config = config; - } - - - @Override - public IPartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext context) { - return new storm.kafka.trident.Coordinator(conf, _config); - } - - @Override - public IPartitionedTridentSpout.Emitter getEmitter(Map conf, TopologyContext context) { - return new TridentKafkaEmitter(conf, context, _config, context - .getStormId()).asTransactionalEmitter(); - } - - @Override - public Fields getOutputFields() { - return _config.scheme.getOutputFields(); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java deleted file mode 100644 index 3878cc8..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import storm.kafka.BrokerHosts; -import storm.kafka.KafkaConfig; - - -public class TridentKafkaConfig extends KafkaConfig { - - - public final IBatchCoordinator coordinator = new DefaultCoordinator(); - - public TridentKafkaConfig(BrokerHosts hosts, String topic) { - super(hosts, topic); - } - - public TridentKafkaConfig(BrokerHosts hosts, String topic, String clientId) { - super(hosts, topic, clientId); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java deleted file mode 100644 index a97d2cb..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java +++ /dev/null @@ -1,287 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import backtype.storm.Config; -import backtype.storm.metric.api.CombinedMetric; -import backtype.storm.metric.api.MeanReducer; -import backtype.storm.metric.api.ReducedMetric; -import backtype.storm.task.TopologyContext; -import com.google.common.collect.ImmutableMap; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.Message; -import kafka.message.MessageAndOffset; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.*; -import storm.kafka.TopicOffsetOutOfRangeException; -import storm.trident.operation.TridentCollector; -import storm.trident.spout.IOpaquePartitionedTridentSpout; -import storm.trident.spout.IPartitionedTridentSpout; -import storm.trident.topology.TransactionAttempt; - -import java.util.*; - -public class TridentKafkaEmitter { - - public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class); - - private DynamicPartitionConnections _connections; - private String _topologyName; - private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric; - private ReducedMetric _kafkaMeanFetchLatencyMetric; - private CombinedMetric _kafkaMaxFetchLatencyMetric; - private TridentKafkaConfig _config; - private String _topologyInstanceId; - - public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) { - _config = config; - _topologyInstanceId = topologyInstanceId; - _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config)); - _topologyName = (String) conf.get(Config.TOPOLOGY_NAME); - _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections); - context.registerMetric("kafkaOffset", _kafkaOffsetMetric, _config.metricsTimeBucketSizeInSecs); - _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), _config.metricsTimeBucketSizeInSecs); - _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), _config.metricsTimeBucketSizeInSecs); - } - - - private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) { - SimpleConsumer consumer = _connections.register(partition); - Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta); - _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset")); - return ret; - } - - private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) { - try { - return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta); - } catch (FailedFetchException e) { - LOG.warn("Failed to fetch from partition " + partition); - if (lastMeta == null) { - return null; - } else { - Map ret = new HashMap(); - ret.put("offset", lastMeta.get("nextOffset")); - ret.put("nextOffset", lastMeta.get("nextOffset")); - ret.put("partition", partition.partition); - ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port)); - ret.put("topic", partition.topic); - ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId)); - return ret; - } - } - } - - private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) { - long offset; - if (lastMeta != null) { - String lastInstanceId = null; - Map lastTopoMeta = (Map) lastMeta.get("topology"); - if (lastTopoMeta != null) { - lastInstanceId = (String) lastTopoMeta.get("id"); - } - if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) { - offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config.startOffsetTime); - } else { - offset = (Long) lastMeta.get("nextOffset"); - } - } else { - offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config); - } - - ByteBufferMessageSet msgs = null; - try { - msgs = fetchMessages(consumer, partition, offset); - } catch (TopicOffsetOutOfRangeException e) { - long newOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); - LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset); - offset = newOffset; - msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset); - } - - long endoffset = offset; - for (MessageAndOffset msg : msgs) { - emit(collector, msg.message(), partition, msg.offset()); - endoffset = msg.nextOffset(); - } - Map newMeta = new HashMap(); - newMeta.put("offset", offset); - newMeta.put("nextOffset", endoffset); - newMeta.put("instanceId", _topologyInstanceId); - newMeta.put("partition", partition.partition); - newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port)); - newMeta.put("topic", partition.topic); - newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId)); - return newMeta; - } - - private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) { - long start = System.nanoTime(); - ByteBufferMessageSet msgs = null; - msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset); - long end = System.nanoTime(); - long millis = (end - start) / 1000000; - _kafkaMeanFetchLatencyMetric.update(millis); - _kafkaMaxFetchLatencyMetric.update(millis); - return msgs; - } - - /** - * re-emit the batch described by the meta data provided - * - * @param attempt - * @param collector - * @param partition - * @param meta - */ - private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) { - LOG.info("re-emitting batch, attempt " + attempt); - String instanceId = (String) meta.get("instanceId"); - if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) { - SimpleConsumer consumer = _connections.register(partition); - long offset = (Long) meta.get("offset"); - long nextOffset = (Long) meta.get("nextOffset"); - ByteBufferMessageSet msgs = null; - msgs = fetchMessages(consumer, partition, offset); - - if(msgs != null) { - for (MessageAndOffset msg : msgs) { - if (offset == nextOffset) { - break; - } - if (offset > nextOffset) { - throw new RuntimeException("Error when re-emitting batch. overshot the end offset"); - } - emit(collector, msg.message(), partition, msg.offset()); - offset = msg.nextOffset(); - } - } - } - } - - private void emit(TridentCollector collector, Message msg, Partition partition, long offset) { - Iterable<List<Object>> values; - if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) { - values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset); - } else { - values = KafkaUtils.generateTuples(_config, msg, partition.topic); - } - - if (values != null) { - for (List<Object> value : values) { - collector.emit(value); - } - } - } - - private void clear() { - _connections.clear(); - } - - private List<Partition> orderPartitions(List<GlobalPartitionInformation> partitions) { - List<Partition> part = new ArrayList<Partition>(); - for (GlobalPartitionInformation globalPartitionInformation : partitions) - part.addAll(globalPartitionInformation.getOrderedPartitions()); - return part; - } - - private void refresh(List<Partition> list) { - _connections.clear(); - _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list)); - } - - - public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> asOpaqueEmitter() { - - return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() { - - /** - * Emit a batch of tuples for a partition/transaction. - * - * Return the metadata describing this batch that will be used as lastPartitionMeta - * for defining the parameters of the next batch. - */ - @Override - public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) { - return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map); - } - - @Override - public void refreshPartitions(List<Partition> partitions) { - refresh(partitions); - } - - @Override - public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) { - return orderPartitions(partitionInformation); - } - - @Override - public void close() { - clear(); - } - }; - } - - public IPartitionedTridentSpout.Emitter asTransactionalEmitter() { - return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() { - - /** - * Emit a batch of tuples for a partition/transaction that's never been emitted before. - * Return the metadata that can be used to reconstruct this partition/batch in the future. - */ - @Override - public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) { - return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map); - } - - /** - * Emit a batch of tuples for a partition/transaction that has been emitted before, using - * the metadata created when it was first emitted. - */ - @Override - public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) { - reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map); - } - - /** - * This method is called when this task is responsible for a new set of partitions. Should be used - * to manage things like connections to brokers. - */ - @Override - public void refreshPartitions(List<Partition> partitions) { - refresh(partitions); - } - - @Override - public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) { - return orderPartitions(partitionInformation); - } - - @Override - public void close() { - clear(); - } - }; - - } - - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java deleted file mode 100644 index 84b6a6a..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import backtype.storm.task.OutputCollector; -import backtype.storm.topology.FailedException; -import org.apache.commons.lang.Validate; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.trident.mapper.TridentTupleToKafkaMapper; -import storm.kafka.trident.selector.KafkaTopicSelector; -import storm.trident.operation.TridentCollector; -import storm.trident.state.State; -import storm.trident.tuple.TridentTuple; - -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -public class TridentKafkaState implements State { - private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class); - - private KafkaProducer producer; - private OutputCollector collector; - - private TridentTupleToKafkaMapper mapper; - private KafkaTopicSelector topicSelector; - - public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { - this.mapper = mapper; - return this; - } - - public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) { - this.topicSelector = selector; - return this; - } - - @Override - public void beginCommit(Long txid) { - LOG.debug("beginCommit is Noop."); - } - - @Override - public void commit(Long txid) { - LOG.debug("commit is Noop."); - } - - public void prepare(Properties options) { - Validate.notNull(mapper, "mapper can not be null"); - Validate.notNull(topicSelector, "topicSelector can not be null"); - producer = new KafkaProducer(options); - } - - public void updateState(List<TridentTuple> tuples, TridentCollector collector) { - String topic = null; - for (TridentTuple tuple : tuples) { - try { - topic = topicSelector.getTopic(tuple); - - if(topic != null) { - Future<RecordMetadata> result = producer.send(new ProducerRecord(topic, - mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); - try { - result.get(); - } catch (ExecutionException e) { - String errorMsg = "Could not retrieve result for message with key = " - + mapper.getKeyFromTuple(tuple) + " from topic = " + topic; - LOG.error(errorMsg, e); - throw new FailedException(errorMsg, e); - } - } else { - LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); - } - } catch (Exception ex) { - String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple) - + " to topic = " + topic; - LOG.warn(errorMsg, ex); - throw new FailedException(errorMsg, ex); - } - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java deleted file mode 100644 index a5d9d42..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import backtype.storm.task.IMetricsContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.trident.mapper.TridentTupleToKafkaMapper; -import storm.kafka.trident.selector.KafkaTopicSelector; -import storm.trident.state.State; -import storm.trident.state.StateFactory; - -import java.util.Map; -import java.util.Properties; - -public class TridentKafkaStateFactory implements StateFactory { - - private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class); - - private TridentTupleToKafkaMapper mapper; - private KafkaTopicSelector topicSelector; - private Properties producerProperties = new Properties(); - - public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { - this.mapper = mapper; - return this; - } - - public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) { - this.topicSelector = selector; - return this; - } - - public TridentKafkaStateFactory withProducerProperties(Properties props) { - this.producerProperties = props; - return this; - } - - @Override - public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions); - TridentKafkaState state = new TridentKafkaState() - .withKafkaTopicSelector(this.topicSelector) - .withTridentTupleToKafkaMapper(this.mapper); - state.prepare(producerProperties); - return state; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java deleted file mode 100644 index 6639b36..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import storm.trident.operation.TridentCollector; -import storm.trident.state.BaseStateUpdater; -import storm.trident.tuple.TridentTuple; - -import java.util.List; - -public class TridentKafkaUpdater extends BaseStateUpdater<TridentKafkaState> { - @Override - public void updateState(TridentKafkaState state, List<TridentTuple> tuples, TridentCollector collector) { - state.updateState(tuples, collector); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java deleted file mode 100644 index b480bdd..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.DynamicBrokersReader; -import storm.kafka.ZkHosts; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - - -public class ZkBrokerReader implements IBrokerReader { - - public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class); - - List<GlobalPartitionInformation> cachedBrokers = new ArrayList<GlobalPartitionInformation>(); - DynamicBrokersReader reader; - long lastRefreshTimeMs; - - - long refreshMillis; - - public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) { - try { - reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic); - cachedBrokers = reader.getBrokerInfo(); - lastRefreshTimeMs = System.currentTimeMillis(); - refreshMillis = hosts.refreshFreqSecs * 1000L; - } catch (java.net.SocketTimeoutException e) { - LOG.warn("Failed to update brokers", e); - } - - } - - private void refresh() { - long currTime = System.currentTimeMillis(); - if (currTime > lastRefreshTimeMs + refreshMillis) { - try { - LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired"); - cachedBrokers = reader.getBrokerInfo(); - lastRefreshTimeMs = currTime; - } catch (java.net.SocketTimeoutException e) { - LOG.warn("Failed to update brokers", e); - } - } - } - @Override - public GlobalPartitionInformation getBrokerForTopic(String topic) { - refresh(); - for(GlobalPartitionInformation partitionInformation : cachedBrokers) { - if (partitionInformation.topic.equals(topic)) return partitionInformation; - } - return null; - } - - @Override - public List<GlobalPartitionInformation> getAllBrokers() { - refresh(); - return cachedBrokers; - } - - @Override - public void close() { - reader.close(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java deleted file mode 100644 index 29a49d1..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident.mapper; - -import storm.trident.tuple.TridentTuple; - -public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper { - - public final String keyFieldName; - public final String msgFieldName; - - public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) { - this.keyFieldName = keyFieldName; - this.msgFieldName = msgFieldName; - } - - @Override - public K getKeyFromTuple(TridentTuple tuple) { - return (K) tuple.getValueByField(keyFieldName); - } - - @Override - public V getMessageFromTuple(TridentTuple tuple) { - return (V) tuple.getValueByField(msgFieldName); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java deleted file mode 100644 index 9759ba3..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident.mapper; - -import backtype.storm.tuple.Tuple; -import storm.trident.tuple.TridentTuple; - -import java.io.Serializable; - -public interface TridentTupleToKafkaMapper<K,V> extends Serializable { - K getKeyFromTuple(TridentTuple tuple); - V getMessageFromTuple(TridentTuple tuple); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java deleted file mode 100644 index 473a38d..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident.selector; - -import storm.trident.tuple.TridentTuple; - -public class DefaultTopicSelector implements KafkaTopicSelector { - - private final String topicName; - - public DefaultTopicSelector(final String topicName) { - this.topicName = topicName; - } - - @Override - public String getTopic(TridentTuple tuple) { - return topicName; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java deleted file mode 100644 index f6c5d82..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident.selector; - -import storm.trident.tuple.TridentTuple; - -import java.io.Serializable; - -public interface KafkaTopicSelector extends Serializable { - String getTopic(TridentTuple tuple); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java new file mode 100644 index 0000000..3363252 --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.Config; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingServer; +import org.apache.curator.utils.ZKPaths; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Date: 16/05/2013 + * Time: 20:35 + */ +public class DynamicBrokersReaderTest { + private DynamicBrokersReader dynamicBrokersReader, wildCardBrokerReader; + private String masterPath = "/brokers"; + private String topic = "testing1"; + private String secondTopic = "testing2"; + private String thirdTopic = "testing3"; + + private CuratorFramework zookeeper; + private TestingServer server; + + @Before + public void setUp() throws Exception { + server = new TestingServer(); + String connectionString = server.getConnectString(); + Map conf = new HashMap(); + conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000); + conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000); + conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5); + + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); + zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy); + dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic); + + Map conf2 = new HashMap(); + conf2.putAll(conf); + conf2.put("kafka.topic.wildcard.match",true); + + wildCardBrokerReader = new DynamicBrokersReader(conf2, connectionString, masterPath, "^test.*$"); + zookeeper.start(); + } + + @After + public void tearDown() throws Exception { + dynamicBrokersReader.close(); + zookeeper.close(); + server.close(); + } + + private void addPartition(int id, String host, int port, String topic) throws Exception { + writePartitionId(id, topic); + writeLeader(id, 0, topic); + writeLeaderDetails(0, host, port); + } + + private void addPartition(int id, int leader, String host, int port, String topic) throws Exception { + writePartitionId(id, topic); + writeLeader(id, leader, topic); + writeLeaderDetails(leader, host, port); + } + + private void writePartitionId(int id, String topic) throws Exception { + String path = dynamicBrokersReader.partitionPath(topic); + writeDataToPath(path, ("" + id)); + } + + private void writeDataToPath(String path, String data) throws Exception { + ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path); + zookeeper.setData().forPath(path, data.getBytes()); + } + + private void writeLeader(int id, int leaderId, String topic) throws Exception { + String path = dynamicBrokersReader.partitionPath(topic) + "/" + id + "/state"; + String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], \"leader\":" + leaderId + ", \"leader_epoch\":1, \"version\":1 }"; + writeDataToPath(path, value); + } + + private void writeLeaderDetails(int leaderId, String host, int port) throws Exception { + String path = dynamicBrokersReader.brokerPath() + "/" + leaderId; + String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, \"port\":" + port + ", \"version\":1 }"; + writeDataToPath(path, value); + } + + + private GlobalPartitionInformation getByTopic(List<GlobalPartitionInformation> partitions, String topic){ + for(GlobalPartitionInformation partitionInformation : partitions) { + if (partitionInformation.topic.equals(topic)) return partitionInformation; + } + return null; + } + + @Test + public void testGetBrokerInfo() throws Exception { + String host = "localhost"; + int port = 9092; + int partition = 0; + addPartition(partition, host, port, topic); + List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo(); + + GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic); + assertNotNull(brokerInfo); + assertEquals(1, brokerInfo.getOrderedPartitions().size()); + assertEquals(port, brokerInfo.getBrokerFor(partition).port); + assertEquals(host, brokerInfo.getBrokerFor(partition).host); + } + + @Test + public void testGetBrokerInfoWildcardMatch() throws Exception { + String host = "localhost"; + int port = 9092; + int partition = 0; + addPartition(partition, host, port, topic); + addPartition(partition, host, port, secondTopic); + + List<GlobalPartitionInformation> partitions = wildCardBrokerReader.getBrokerInfo(); + + GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic); + assertNotNull(brokerInfo); + assertEquals(1, brokerInfo.getOrderedPartitions().size()); + assertEquals(port, brokerInfo.getBrokerFor(partition).port); + assertEquals(host, brokerInfo.getBrokerFor(partition).host); + + brokerInfo = getByTopic(partitions, secondTopic); + assertNotNull(brokerInfo); + assertEquals(1, brokerInfo.getOrderedPartitions().size()); + assertEquals(port, brokerInfo.getBrokerFor(partition).port); + assertEquals(host, brokerInfo.getBrokerFor(partition).host); + + addPartition(partition, host, port, thirdTopic); + //Discover newly added topic + partitions = wildCardBrokerReader.getBrokerInfo(); + assertNotNull(getByTopic(partitions, topic)); + assertNotNull(getByTopic(partitions, secondTopic)); + assertNotNull(getByTopic(partitions, secondTopic)); + } + + + @Test + public void testMultiplePartitionsOnDifferentHosts() throws Exception { + String host = "localhost"; + int port = 9092; + int secondPort = 9093; + int partition = 0; + int secondPartition = partition + 1; + addPartition(partition, 0, host, port, topic); + addPartition(secondPartition, 1, host, secondPort, topic); + + List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo(); + + GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic); + assertNotNull(brokerInfo); + assertEquals(2, brokerInfo.getOrderedPartitions().size()); + + assertEquals(port, brokerInfo.getBrokerFor(partition).port); + assertEquals(host, brokerInfo.getBrokerFor(partition).host); + + assertEquals(secondPort, brokerInfo.getBrokerFor(secondPartition).port); + assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host); + } + + + @Test + public void testMultiplePartitionsOnSameHost() throws Exception { + String host = "localhost"; + int port = 9092; + int partition = 0; + int secondPartition = partition + 1; + addPartition(partition, 0, host, port, topic); + addPartition(secondPartition, 0, host, port, topic); + + List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo(); + + GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic); + assertNotNull(brokerInfo); + assertEquals(2, brokerInfo.getOrderedPartitions().size()); + + assertEquals(port, brokerInfo.getBrokerFor(partition).port); + assertEquals(host, brokerInfo.getBrokerFor(partition).host); + + assertEquals(port, brokerInfo.getBrokerFor(secondPartition).port); + assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host); + } + + @Test + public void testSwitchHostForPartition() throws Exception { + String host = "localhost"; + int port = 9092; + int partition = 0; + addPartition(partition, host, port, topic); + List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo(); + + GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic); + assertNotNull(brokerInfo); + assertEquals(port, brokerInfo.getBrokerFor(partition).port); + assertEquals(host, brokerInfo.getBrokerFor(partition).host); + + String newHost = host + "switch"; + int newPort = port + 1; + addPartition(partition, newHost, newPort, topic); + partitions = dynamicBrokersReader.getBrokerInfo(); + + brokerInfo = getByTopic(partitions, topic); + assertNotNull(brokerInfo); + assertEquals(newPort, brokerInfo.getBrokerFor(partition).port); + assertEquals(newHost, brokerInfo.getBrokerFor(partition).host); + } + + @Test(expected = NullPointerException.class) + public void testErrorLogsWhenConfigIsMissing() throws Exception { + String connectionString = server.getConnectString(); + Map conf = new HashMap(); + conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000); +// conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000); + conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5); + + DynamicBrokersReader dynamicBrokersReader1 = new DynamicBrokersReader(conf, connectionString, masterPath, topic); + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java new file mode 100644 index 0000000..8fa6564 --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.junit.Test; + +public class ExponentialBackoffMsgRetryManagerTest { + + private static final Long TEST_OFFSET = 101L; + private static final Long TEST_OFFSET2 = 102L; + private static final Long TEST_OFFSET3 = 105L; + private static final Long TEST_NEW_OFFSET = 103L; + + @Test + public void testImmediateRetry() throws Exception { + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + manager.failed(TEST_OFFSET); + Long next = manager.nextFailedMessageToRetry(); + assertEquals("expect test offset next available for retry", TEST_OFFSET, next); + assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET)); + + manager.retryStarted(TEST_OFFSET); + + manager.failed(TEST_OFFSET); + next = manager.nextFailedMessageToRetry(); + assertEquals("expect test offset next available for retry", TEST_OFFSET, next); + assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET)); + } + + @Test + public void testSingleDelay() throws Exception { + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(100, 1d, 1000); + manager.failed(TEST_OFFSET); + Thread.sleep(5); + Long next = manager.nextFailedMessageToRetry(); + assertNull("expect no message ready for retry yet", next); + assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET)); + + Thread.sleep(100); + next = manager.nextFailedMessageToRetry(); + assertEquals("expect test offset next available for retry", TEST_OFFSET, next); + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + } + + @Test + public void testExponentialBackoff() throws Exception { + final long initial = 10; + final double mult = 2d; + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, initial * 10); + + long expectedWaitTime = initial; + for (long i = 0L; i < 3L; ++i) { + manager.failed(TEST_OFFSET); + + Thread.sleep((expectedWaitTime + 1L) / 2L); + assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET)); + + Thread.sleep((expectedWaitTime + 1L) / 2L); + Long next = manager.nextFailedMessageToRetry(); + assertEquals("expect test offset next available for retry", TEST_OFFSET, next); + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + + manager.retryStarted(TEST_OFFSET); + expectedWaitTime *= mult; + } + } + + @Test + public void testRetryOrder() throws Exception { + final long initial = 10; + final double mult = 2d; + final long max = 20; + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max); + + manager.failed(TEST_OFFSET); + Thread.sleep(initial); + + manager.retryStarted(TEST_OFFSET); + manager.failed(TEST_OFFSET); + manager.failed(TEST_OFFSET2); + + // although TEST_OFFSET failed first, it's retry delay time is longer b/c this is the second retry + // so TEST_OFFSET2 should come first + + Thread.sleep(initial * 2); + assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2)); + + Long next = manager.nextFailedMessageToRetry(); + assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next); + + Thread.sleep(initial); + + // haven't retried yet, so first should still be TEST_OFFSET2 + next = manager.nextFailedMessageToRetry(); + assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next); + manager.retryStarted(next); + + // now it should be TEST_OFFSET + next = manager.nextFailedMessageToRetry(); + assertEquals("expect message to retry is now "+TEST_OFFSET, TEST_OFFSET, next); + manager.retryStarted(next); + + // now none left + next = manager.nextFailedMessageToRetry(); + assertNull("expect no message to retry now", next); + } + + @Test + public void testQueriesAfterRetriedAlready() throws Exception { + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + manager.failed(TEST_OFFSET); + Long next = manager.nextFailedMessageToRetry(); + assertEquals("expect test offset next available for retry", TEST_OFFSET, next); + assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET)); + + manager.retryStarted(TEST_OFFSET); + next = manager.nextFailedMessageToRetry(); + assertNull("expect no message ready after retried", next); + assertFalse("message should not be ready after retried", manager.shouldRetryMsg(TEST_OFFSET)); + } + + @Test(expected = IllegalStateException.class) + public void testRetryWithoutFail() throws Exception { + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + manager.retryStarted(TEST_OFFSET); + } + + @Test(expected = IllegalStateException.class) + public void testFailRetryRetry() throws Exception { + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + manager.failed(TEST_OFFSET); + try { + manager.retryStarted(TEST_OFFSET); + } catch (IllegalStateException ise) { + fail("IllegalStateException unexpected here: " + ise); + } + + assertFalse("message should not be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + manager.retryStarted(TEST_OFFSET); + } + + @Test + public void testMaxBackoff() throws Exception { + final long initial = 100; + final double mult = 2d; + final long max = 2000; + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max); + + long expectedWaitTime = initial; + for (long i = 0L; i < 4L; ++i) { + manager.failed(TEST_OFFSET); + + Thread.sleep((expectedWaitTime + 1L) / 2L); + assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET)); + + Thread.sleep((expectedWaitTime + 1L) / 2L); + Long next = manager.nextFailedMessageToRetry(); + assertEquals("expect test offset next available for retry", TEST_OFFSET, next); + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + + manager.retryStarted(TEST_OFFSET); + expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max); + } + } + + @Test + public void testFailThenAck() throws Exception { + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + manager.failed(TEST_OFFSET); + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + + manager.acked(TEST_OFFSET); + + Long next = manager.nextFailedMessageToRetry(); + assertNull("expect no message ready after acked", next); + assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET)); + } + + @Test + public void testAckThenFail() throws Exception { + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + manager.acked(TEST_OFFSET); + assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET)); + + manager.failed(TEST_OFFSET); + + Long next = manager.nextFailedMessageToRetry(); + assertEquals("expect test offset next available for retry", TEST_OFFSET, next); + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + } + + @Test + public void testClearInvalidMessages() throws Exception { + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + manager.failed(TEST_OFFSET); + manager.failed(TEST_OFFSET2); + manager.failed(TEST_OFFSET3); + + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2)); + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3)); + + manager.clearInvalidMessages(TEST_NEW_OFFSET); + + Long next = manager.nextFailedMessageToRetry(); + assertEquals("expect test offset next available for retry", TEST_OFFSET3, next); + + manager.acked(TEST_OFFSET3); + next = manager.nextFailedMessageToRetry(); + assertNull("expect no message ready after acked", next); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java new file mode 100644 index 0000000..e38bc1e --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * Date: 12/01/2014 + * Time: 18:09 + */ +public class KafkaErrorTest { + + @Test + public void getError() { + assertThat(KafkaError.getError(0), is(equalTo(KafkaError.NO_ERROR))); + } + + @Test + public void offsetMetaDataTooLarge() { + assertThat(KafkaError.getError(12), is(equalTo(KafkaError.OFFSET_METADATA_TOO_LARGE))); + } + + @Test + public void unknownNegative() { + assertThat(KafkaError.getError(-1), is(equalTo(KafkaError.UNKNOWN))); + } + + @Test + public void unknownPositive() { + assertThat(KafkaError.getError(75), is(equalTo(KafkaError.UNKNOWN))); + } + + @Test + public void unknown() { + assertThat(KafkaError.getError(13), is(equalTo(KafkaError.UNKNOWN))); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java new file mode 100644 index 0000000..e2fb60f --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingServer; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +/** + * Date: 11/01/2014 + * Time: 13:15 + */ +public class KafkaTestBroker { + + private int port; + private KafkaServerStartable kafka; + private TestingServer server; + private CuratorFramework zookeeper; + private File logDir; + + public KafkaTestBroker() { + try { + server = new TestingServer(); + String zookeeperConnectionString = server.getConnectString(); + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); + zookeeper = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); + zookeeper.start(); + port = InstanceSpec.getRandomPort(); + logDir = new File(System.getProperty("java.io.tmpdir"), "kafka/logs/kafka-test-" + port); + KafkaConfig config = buildKafkaConfig(zookeeperConnectionString); + kafka = new KafkaServerStartable(config); + kafka.startup(); + } catch (Exception ex) { + throw new RuntimeException("Could not start test broker", ex); + } + } + + private kafka.server.KafkaConfig buildKafkaConfig(String zookeeperConnectionString) { + Properties p = new Properties(); + p.setProperty("zookeeper.connect", zookeeperConnectionString); + p.setProperty("broker.id", "0"); + p.setProperty("port", "" + port); + p.setProperty("log.dirs", logDir.getAbsolutePath()); + return new KafkaConfig(p); + } + + public String getBrokerConnectionString() { + return "localhost:" + port; + } + + public int getPort() { + return port; + } + public void shutdown() { + kafka.shutdown(); + if (zookeeper.getState().equals(CuratorFrameworkState.STARTED)) { + zookeeper.close(); + } + try { + server.close(); + } catch (IOException e) { + e.printStackTrace(); + } + FileUtils.deleteQuietly(logDir); + } +}
