http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java index b76e230..d2ca5b8 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java @@ -17,9 +17,9 @@ */ package org.apache.storm.jdbc.trident.state; -import storm.trident.operation.TridentCollector; -import storm.trident.state.BaseStateUpdater; -import storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.BaseStateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; import java.util.List;
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java index 1fda3b1..9a5ec09 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java @@ -17,7 +17,7 @@ */ package org.apache.storm.jdbc.bolt; -import backtype.storm.tuple.Fields; +import org.apache.storm.tuple.Fields; import com.google.common.collect.Lists; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java index 718917a..fdcd053 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java @@ -17,12 +17,12 @@ */ package org.apache.storm.jdbc.spout; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; import java.util.*; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java index 9df5a86..ec7ca36 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java @@ -17,10 +17,10 @@ */ package org.apache.storm.jdbc.topology; -import backtype.storm.Config; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; +import org.apache.storm.Config; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.tuple.Fields; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.storm.jdbc.common.Column; @@ -32,7 +32,7 @@ import org.apache.storm.jdbc.mapper.JdbcLookupMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; import org.apache.storm.jdbc.spout.UserSpout; -import backtype.storm.LocalCluster; +import org.apache.storm.LocalCluster; import java.sql.Types; import java.util.List; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java index 585994e..1915219 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java @@ -17,8 +17,8 @@ */ package org.apache.storm.jdbc.topology; -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.TopologyBuilder; import com.google.common.collect.Lists; import org.apache.storm.jdbc.bolt.JdbcInsertBolt; import org.apache.storm.jdbc.bolt.JdbcLookupBolt; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java index 522d41a..11269c3 100644 --- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java +++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java @@ -17,8 +17,8 @@ */ package org.apache.storm.jdbc.topology; -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.tuple.Fields; import com.google.common.collect.Lists; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; @@ -27,9 +27,9 @@ import org.apache.storm.jdbc.trident.state.JdbcQuery; import org.apache.storm.jdbc.trident.state.JdbcState; import org.apache.storm.jdbc.trident.state.JdbcStateFactory; import org.apache.storm.jdbc.trident.state.JdbcUpdater; -import storm.trident.Stream; -import storm.trident.TridentState; -import storm.trident.TridentTopology; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; import java.sql.Types; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/README.md ---------------------------------------------------------------------- diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md index 3dfa0b7..3a86cf0 100644 --- a/external/storm-kafka/README.md +++ b/external/storm-kafka/README.md @@ -65,7 +65,7 @@ In addition to these parameters, SpoutConfig contains the following fields that // Exponential back-off retry settings. These are used when retrying messages after a bolt // calls OutputCollector.fail(). - // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent + // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent // resubmitting the message while still retrying. public long retryInitialDelayMs = 0; public double retryDelayMultiplier = 1.0; @@ -190,9 +190,9 @@ use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependen Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies. ##Writing to Kafka as part of your topology -You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you -are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and -storm.kafka.trident.TridentKafkaUpdater. +You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you +are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and +org.apache.storm.kafka.trident.TridentKafkaUpdater. You need to provide implementation of following 2 interfaces http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java new file mode 100644 index 0000000..0d95e8d --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import com.google.common.base.Objects; + +import java.io.Serializable; + +public class Broker implements Serializable, Comparable<Broker> { + public String host; + public int port; + + // for kryo compatibility + private Broker() { + + } + + public Broker(String host, int port) { + this.host = host; + this.port = port; + } + + public Broker(String host) { + this(host, 9092); + } + + @Override + public int hashCode() { + return Objects.hashCode(host, port); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final Broker other = (Broker) obj; + return Objects.equal(this.host, other.host) && Objects.equal(this.port, other.port); + } + + @Override + public String toString() { + return host + ":" + port; + } + + public static Broker fromString(String host) { + Broker hp; + String[] spec = host.split(":"); + if (spec.length == 1) { + hp = new Broker(spec[0]); + } else if (spec.length == 2) { + hp = new Broker(spec[0], Integer.parseInt(spec[1])); + } else { + throw new IllegalArgumentException("Invalid host specification: " + host); + } + return hp; + } + + + @Override + public int compareTo(Broker o) { + if (this.host.equals(o.host)) { + return this.port - o.port; + } else { + return this.host.compareTo(o.host); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java new file mode 100644 index 0000000..13ba0a1 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import java.io.Serializable; + + +public interface BrokerHosts extends Serializable { + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java new file mode 100644 index 0000000..2a18a7f --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.utils.Utils; +import org.apache.kafka.common.serialization.Serializer; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class ByteBufferSerializer implements Serializer<ByteBuffer> { + @Override + public void configure(Map<String, ?> map, boolean b) { + + } + + @Override + public void close() { + + } + + @Override + public byte[] serialize(String s, ByteBuffer b) { + return Utils.toByteArray(b); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java new file mode 100644 index 0000000..0fc85b3 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; + +import java.io.UnsupportedEncodingException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class DynamicBrokersReader { + + public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class); + + private CuratorFramework _curator; + private String _zkPath; + private String _topic; + private Boolean _isWildcardTopic; + + public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) { + // Check required parameters + Preconditions.checkNotNull(conf, "conf cannot be null"); + + validateConfig(conf); + + Preconditions.checkNotNull(zkStr,"zkString cannot be null"); + Preconditions.checkNotNull(zkPath, "zkPath cannot be null"); + Preconditions.checkNotNull(topic, "topic cannot be null"); + + _zkPath = zkPath; + _topic = topic; + _isWildcardTopic = Utils.getBoolean(conf.get("kafka.topic.wildcard.match"), false); + try { + _curator = CuratorFrameworkFactory.newClient( + zkStr, + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), + new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)))); + _curator.start(); + } catch (Exception ex) { + LOG.error("Couldn't connect to zookeeper", ex); + throw new RuntimeException(ex); + } + } + + /** + * Get all partitions with their current leaders + */ + public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException { + List<String> topics = getTopics(); + List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); + + for (String topic : topics) { + GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic); + try { + int numPartitionsForTopic = getNumPartitions(topic); + String brokerInfoPath = brokerPath(); + for (int partition = 0; partition < numPartitionsForTopic; partition++) { + int leader = getLeaderFor(topic,partition); + String path = brokerInfoPath + "/" + leader; + try { + byte[] brokerData = _curator.getData().forPath(path); + Broker hp = getBrokerHost(brokerData); + globalPartitionInformation.addPartition(partition, hp); + } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { + LOG.error("Node {} does not exist ", path); + } + } + } catch (SocketTimeoutException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + LOG.info("Read partition info from zookeeper: " + globalPartitionInformation); + partitions.add(globalPartitionInformation); + } + return partitions; + } + + private int getNumPartitions(String topic) { + try { + String topicBrokersPath = partitionPath(topic); + List<String> children = _curator.getChildren().forPath(topicBrokersPath); + return children.size(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private List<String> getTopics() { + List<String> topics = new ArrayList<String>(); + if (!_isWildcardTopic) { + topics.add(_topic); + return topics; + } else { + try { + List<String> children = _curator.getChildren().forPath(topicsPath()); + for (String t : children) { + if (t.matches(_topic)) { + LOG.info(String.format("Found matching topic %s", t)); + topics.add(t); + } + } + return topics; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + public String topicsPath () { + return _zkPath + "/topics"; + } + public String partitionPath(String topic) { + return topicsPath() + "/" + topic + "/partitions"; + } + + public String brokerPath() { + return _zkPath + "/ids"; + } + + + + /** + * get /brokers/topics/distributedTopic/partitions/1/state + * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 } + * @param topic + * @param partition + * @return + */ + private int getLeaderFor(String topic, long partition) { + try { + String topicBrokersPath = partitionPath(topic); + byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state"); + Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8")); + Integer leader = ((Number) value.get("leader")).intValue(); + if (leader == -1) { + throw new RuntimeException("No leader found for partition " + partition); + } + return leader; + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void close() { + _curator.close(); + } + + /** + * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0 + * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 } + * + * @param contents + * @return + */ + private Broker getBrokerHost(byte[] contents) { + try { + Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8")); + String host = (String) value.get("host"); + Integer port = ((Long) value.get("port")).intValue(); + return new Broker(host, port); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + /** + * Validate required parameters in the input configuration Map + * @param conf + */ + private void validateConfig(final Map conf) { + Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT), + "%s cannot be null", Config.STORM_ZOOKEEPER_SESSION_TIMEOUT); + Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT), + "%s cannot be null", Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT); + Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES), + "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_TIMES); + Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL), + "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_INTERVAL); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java new file mode 100644 index 0000000..6d30139 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import kafka.javaapi.consumer.SimpleConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.trident.IBrokerReader; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + + +public class DynamicPartitionConnections { + + public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class); + + static class ConnectionInfo { + SimpleConsumer consumer; + Set<String> partitions = new HashSet<String>(); + + public ConnectionInfo(SimpleConsumer consumer) { + this.consumer = consumer; + } + } + + Map<Broker, ConnectionInfo> _connections = new HashMap(); + KafkaConfig _config; + IBrokerReader _reader; + + public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) { + _config = config; + _reader = brokerReader; + } + + public SimpleConsumer register(Partition partition) { + Broker broker = _reader.getBrokerForTopic(partition.topic).getBrokerFor(partition.partition); + return register(broker, partition.topic, partition.partition); + } + + public SimpleConsumer register(Broker host, String topic, int partition) { + if (!_connections.containsKey(host)) { + _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId))); + } + ConnectionInfo info = _connections.get(host); + info.partitions.add(getHashKey(topic,partition)); + return info.consumer; + } + + public SimpleConsumer getConnection(Partition partition) { + ConnectionInfo info = _connections.get(partition.host); + if (info != null) { + return info.consumer; + } + return null; + } + + public void unregister(Broker port, String topic, int partition) { + ConnectionInfo info = _connections.get(port); + info.partitions.remove(getHashKey(topic,partition)); + if (info.partitions.isEmpty()) { + info.consumer.close(); + _connections.remove(port); + } + } + + public void unregister(Partition partition) { + unregister(partition.host, partition.topic, partition.partition); + } + + public void clear() { + for (ConnectionInfo info : _connections.values()) { + info.consumer.close(); + } + _connections.clear(); + } + + private String getHashKey(String topic, int partition) { + return topic + "_" + partition; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java new file mode 100644 index 0000000..f86d624 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import java.util.Comparator; +import java.util.HashSet; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager { + + private final long retryInitialDelayMs; + private final double retryDelayMultiplier; + private final long retryDelayMaxMs; + + private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator()); + private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>(); + + public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) { + this.retryInitialDelayMs = retryInitialDelayMs; + this.retryDelayMultiplier = retryDelayMultiplier; + this.retryDelayMaxMs = retryDelayMaxMs; + } + + @Override + public void failed(Long offset) { + MessageRetryRecord oldRecord = this.records.get(offset); + MessageRetryRecord newRecord = oldRecord == null ? + new MessageRetryRecord(offset) : + oldRecord.createNextRetryRecord(); + this.records.put(offset, newRecord); + this.waiting.add(newRecord); + } + + @Override + public void acked(Long offset) { + MessageRetryRecord record = this.records.remove(offset); + if (record != null) { + this.waiting.remove(record); + } + } + + @Override + public void retryStarted(Long offset) { + MessageRetryRecord record = this.records.get(offset); + if (record == null || !this.waiting.contains(record)) { + throw new IllegalStateException("cannot retry a message that has not failed"); + } else { + this.waiting.remove(record); + } + } + + @Override + public Long nextFailedMessageToRetry() { + if (this.waiting.size() > 0) { + MessageRetryRecord first = this.waiting.peek(); + if (System.currentTimeMillis() >= first.retryTimeUTC) { + if (this.records.containsKey(first.offset)) { + return first.offset; + } else { + // defensive programming - should be impossible + this.waiting.remove(first); + return nextFailedMessageToRetry(); + } + } + } + return null; + } + + @Override + public boolean shouldRetryMsg(Long offset) { + MessageRetryRecord record = this.records.get(offset); + return record != null && + this.waiting.contains(record) && + System.currentTimeMillis() >= record.retryTimeUTC; + } + + @Override + public Set<Long> clearInvalidMessages(Long kafkaOffset) { + Set<Long> invalidOffsets = new HashSet<Long>(); + for(Long offset : records.keySet()){ + if(offset < kafkaOffset){ + MessageRetryRecord record = this.records.remove(offset); + if (record != null) { + this.waiting.remove(record); + invalidOffsets.add(offset); + } + } + } + return invalidOffsets; + } + + /** + * A MessageRetryRecord holds the data of how many times a message has + * failed and been retried, and when the last failure occurred. It can + * determine whether it is ready to be retried by employing an exponential + * back-off calculation using config values stored in SpoutConfig: + * <ul> + * <li>retryInitialDelayMs - time to delay before the first retry</li> + * <li>retryDelayMultiplier - multiplier by which to increase the delay for each subsequent retry</li> + * <li>retryDelayMaxMs - maximum retry delay (once this delay time is reached, subsequent retries will + * delay for this amount of time every time) + * </li> + * </ul> + */ + private class MessageRetryRecord { + private final long offset; + private final int retryNum; + private final long retryTimeUTC; + + public MessageRetryRecord(long offset) { + this(offset, 1); + } + + private MessageRetryRecord(long offset, int retryNum) { + this.offset = offset; + this.retryNum = retryNum; + this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay(); + } + + /** + * Create a MessageRetryRecord for the next retry that should occur after this one. + * @return MessageRetryRecord with the next retry time, or null to indicate that another + * retry should not be performed. The latter case can happen if we are about to + * run into the org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm + * configuration. + */ + public MessageRetryRecord createNextRetryRecord() { + return new MessageRetryRecord(this.offset, this.retryNum + 1); + } + + private long calculateRetryDelay() { + double delayMultiplier = Math.pow(retryDelayMultiplier, this.retryNum - 1); + double delay = retryInitialDelayMs * delayMultiplier; + Long maxLong = Long.MAX_VALUE; + long delayThisRetryMs = delay >= maxLong.doubleValue() + ? maxLong + : (long) delay; + return Math.min(delayThisRetryMs, retryDelayMaxMs); + } + + @Override + public boolean equals(Object other) { + return (other instanceof MessageRetryRecord + && this.offset == ((MessageRetryRecord) other).offset); + } + + @Override + public int hashCode() { + return Long.valueOf(this.offset).hashCode(); + } + } + + private static class RetryTimeComparator implements Comparator<MessageRetryRecord> { + + @Override + public int compare(MessageRetryRecord record1, MessageRetryRecord record2) { + return Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC)); + } + + @Override + public boolean equals(Object obj) { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java new file mode 100644 index 0000000..448d0c3 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +public class FailedFetchException extends RuntimeException { + + public FailedFetchException(String message) { + super(message); + } + + public FailedFetchException(Exception e) { + super(e); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java new file mode 100644 index 0000000..e9a7092 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import java.util.Set; + +public interface FailedMsgRetryManager { + public void failed(Long offset); + public void acked(Long offset); + public void retryStarted(Long offset); + public Long nextFailedMessageToRetry(); + public boolean shouldRetryMsg(Long offset); + public Set<Long> clearInvalidMessages(Long kafkaOffset); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java new file mode 100644 index 0000000..75f5563 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.kafka.common.serialization.Serializer; + +import java.nio.ByteBuffer; +import java.nio.IntBuffer; +import java.util.Map; + +public class IntSerializer implements Serializer<Integer> { + @Override + public void configure(Map<String, ?> map, boolean b) { + } + + @Override + public byte[] serialize(String topic, Integer val) { + byte[] r = new byte[4]; + IntBuffer b = ByteBuffer.wrap(r).asIntBuffer(); + b.put(val); + return r; + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java new file mode 100644 index 0000000..e1e1d24 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.spout.MultiScheme; +import org.apache.storm.spout.RawMultiScheme; + +import java.io.Serializable; + +public class KafkaConfig implements Serializable { + private static final long serialVersionUID = 5276718734571623855L; + + public final BrokerHosts hosts; + public final String topic; + public final String clientId; + + public int fetchSizeBytes = 1024 * 1024; + public int socketTimeoutMs = 10000; + public int fetchMaxWait = 10000; + public int bufferSizeBytes = 1024 * 1024; + public MultiScheme scheme = new RawMultiScheme(); + public boolean ignoreZkOffsets = false; + public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); + public long maxOffsetBehind = Long.MAX_VALUE; + public boolean useStartOffsetTimeIfOffsetOutOfRange = true; + public int metricsTimeBucketSizeInSecs = 60; + + public KafkaConfig(BrokerHosts hosts, String topic) { + this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId()); + } + + public KafkaConfig(BrokerHosts hosts, String topic, String clientId) { + this.hosts = hosts; + this.topic = topic; + this.clientId = clientId; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java new file mode 100644 index 0000000..1d866e7 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +public enum KafkaError { + NO_ERROR, + OFFSET_OUT_OF_RANGE, + INVALID_MESSAGE, + UNKNOWN_TOPIC_OR_PARTITION, + INVALID_FETCH_SIZE, + LEADER_NOT_AVAILABLE, + NOT_LEADER_FOR_PARTITION, + REQUEST_TIMED_OUT, + BROKER_NOT_AVAILABLE, + REPLICA_NOT_AVAILABLE, + MESSAGE_SIZE_TOO_LARGE, + STALE_CONTROLLER_EPOCH, + OFFSET_METADATA_TOO_LARGE, + UNKNOWN; + + public static KafkaError getError(int errorCode) { + if (errorCode < 0 || errorCode >= UNKNOWN.ordinal()) { + return UNKNOWN; + } else { + return values()[errorCode]; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java new file mode 100644 index 0000000..7a83ae0 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.Config; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import com.google.common.base.Strings; +import kafka.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.PartitionManager.KafkaMessageId; + +import java.util.*; + +// TODO: need to add blacklisting +// TODO: need to make a best effort to not re-emit messages if don't have to +public class KafkaSpout extends BaseRichSpout { + static enum EmitState { + EMITTED_MORE_LEFT, + EMITTED_END, + NO_EMITTED + } + + public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); + + SpoutConfig _spoutConfig; + SpoutOutputCollector _collector; + PartitionCoordinator _coordinator; + DynamicPartitionConnections _connections; + ZkState _state; + + long _lastUpdateMs = 0; + + int _currPartitionIndex = 0; + + public KafkaSpout(SpoutConfig spoutConf) { + _spoutConfig = spoutConf; + } + + @Override + public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { + _collector = collector; + String topologyInstanceId = context.getStormId(); + Map stateConf = new HashMap(conf); + List<String> zkServers = _spoutConfig.zkServers; + if (zkServers == null) { + zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); + } + Integer zkPort = _spoutConfig.zkPort; + if (zkPort == null) { + zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); + } + stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); + stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); + stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); + _state = new ZkState(stateConf); + + _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); + + // using TransactionalState like this is a hack + int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); + if (_spoutConfig.hosts instanceof StaticHosts) { + _coordinator = new StaticCoordinator(_connections, conf, + _spoutConfig, _state, context.getThisTaskIndex(), + totalTasks, topologyInstanceId); + } else { + _coordinator = new ZkCoordinator(_connections, conf, + _spoutConfig, _state, context.getThisTaskIndex(), + totalTasks, topologyInstanceId); + } + + context.registerMetric("kafkaOffset", new IMetric() { + KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections); + + @Override + public Object getValueAndReset() { + List<PartitionManager> pms = _coordinator.getMyManagedPartitions(); + Set<Partition> latestPartitions = new HashSet(); + for (PartitionManager pm : pms) { + latestPartitions.add(pm.getPartition()); + } + _kafkaOffsetMetric.refreshPartitions(latestPartitions); + for (PartitionManager pm : pms) { + _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset()); + } + return _kafkaOffsetMetric.getValueAndReset(); + } + }, _spoutConfig.metricsTimeBucketSizeInSecs); + + context.registerMetric("kafkaPartition", new IMetric() { + @Override + public Object getValueAndReset() { + List<PartitionManager> pms = _coordinator.getMyManagedPartitions(); + Map concatMetricsDataMaps = new HashMap(); + for (PartitionManager pm : pms) { + concatMetricsDataMaps.putAll(pm.getMetricsDataMap()); + } + return concatMetricsDataMaps; + } + }, _spoutConfig.metricsTimeBucketSizeInSecs); + } + + @Override + public void close() { + _state.close(); + } + + @Override + public void nextTuple() { + List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); + for (int i = 0; i < managers.size(); i++) { + + try { + // in case the number of managers decreased + _currPartitionIndex = _currPartitionIndex % managers.size(); + EmitState state = managers.get(_currPartitionIndex).next(_collector); + if (state != EmitState.EMITTED_MORE_LEFT) { + _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); + } + if (state != EmitState.NO_EMITTED) { + break; + } + } catch (FailedFetchException e) { + LOG.warn("Fetch failed", e); + _coordinator.refresh(); + } + } + + long diffWithNow = System.currentTimeMillis() - _lastUpdateMs; + + /* + As far as the System.currentTimeMillis() is dependent on System clock, + additional check on negative value of diffWithNow in case of external changes. + */ + if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) { + commit(); + } + } + + @Override + public void ack(Object msgId) { + KafkaMessageId id = (KafkaMessageId) msgId; + PartitionManager m = _coordinator.getManager(id.partition); + if (m != null) { + m.ack(id.offset); + } + } + + @Override + public void fail(Object msgId) { + KafkaMessageId id = (KafkaMessageId) msgId; + PartitionManager m = _coordinator.getManager(id.partition); + if (m != null) { + m.fail(id.offset); + } + } + + @Override + public void deactivate() { + commit(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { + declarer.declareStream(_spoutConfig.outputStreamId, _spoutConfig.scheme.getOutputFields()); + } else { + declarer.declare(_spoutConfig.scheme.getOutputFields()); + } + } + + private void commit() { + _lastUpdateMs = System.currentTimeMillis(); + for (PartitionManager manager : _coordinator.getMyManagedPartitions()) { + manager.commit(); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java new file mode 100644 index 0000000..8cd0fd0 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.utils.Utils; +import com.google.common.base.Preconditions; +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.TopicAndPartition; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.OffsetRequest; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; +import org.apache.storm.kafka.trident.IBrokerReader; +import org.apache.storm.kafka.trident.StaticBrokerReader; +import org.apache.storm.kafka.trident.ZkBrokerReader; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.UnresolvedAddressException; +import java.util.*; + + +public class KafkaUtils { + + public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); + private static final int NO_OFFSET = -5; + + + public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) { + if (conf.hosts instanceof StaticHosts) { + return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation()); + } else { + return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts); + } + } + + + public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) { + long startOffsetTime = config.startOffsetTime; + return getOffset(consumer, topic, partition, startOffsetTime); + } + + public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1)); + OffsetRequest request = new OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); + + long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition); + if (offsets.length > 0) { + return offsets[0]; + } else { + return NO_OFFSET; + } + } + + public static class KafkaOffsetMetric implements IMetric { + Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>(); + Set<Partition> _partitions; + DynamicPartitionConnections _connections; + + public KafkaOffsetMetric(DynamicPartitionConnections connections) { + _connections = connections; + } + + public void setLatestEmittedOffset(Partition partition, long offset) { + _partitionToOffset.put(partition, offset); + } + + private class TopicMetrics { + long totalSpoutLag = 0; + long totalEarliestTimeOffset = 0; + long totalLatestTimeOffset = 0; + long totalLatestEmittedOffset = 0; + } + + @Override + public Object getValueAndReset() { + try { + HashMap ret = new HashMap(); + if (_partitions != null && _partitions.size() == _partitionToOffset.size()) { + Map<String,TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>(); + for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) { + Partition partition = e.getKey(); + SimpleConsumer consumer = _connections.getConnection(partition); + if (consumer == null) { + LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?"); + return null; + } + long latestTimeOffset = getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime()); + long earliestTimeOffset = getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); + if (latestTimeOffset == KafkaUtils.NO_OFFSET) { + LOG.warn("No data found in Kafka Partition " + partition.getId()); + return null; + } + long latestEmittedOffset = e.getValue(); + long spoutLag = latestTimeOffset - latestEmittedOffset; + String topic = partition.topic; + String metricPath = partition.getId(); + //Handle the case where Partition Path Id does not contain topic name Partition.getId() == "partition_" + partition + if (!metricPath.startsWith(topic + "/")) { + metricPath = topic + "/" + metricPath; + } + ret.put(metricPath + "/" + "spoutLag", spoutLag); + ret.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset); + ret.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset); + ret.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset); + + if (!topicMetricsMap.containsKey(partition.topic)) { + topicMetricsMap.put(partition.topic,new TopicMetrics()); + } + + TopicMetrics topicMetrics = topicMetricsMap.get(partition.topic); + topicMetrics.totalSpoutLag += spoutLag; + topicMetrics.totalEarliestTimeOffset += earliestTimeOffset; + topicMetrics.totalLatestTimeOffset += latestTimeOffset; + topicMetrics.totalLatestEmittedOffset += latestEmittedOffset; + } + + for(Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) { + String topic = e.getKey(); + TopicMetrics topicMetrics = e.getValue(); + ret.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag); + ret.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset); + ret.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset); + ret.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset); + } + + return ret; + } else { + LOG.info("Metrics Tick: Not enough data to calculate spout lag."); + } + } catch (Throwable t) { + LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t); + } + return null; + } + + public void refreshPartitions(Set<Partition> partitions) { + _partitions = partitions; + Iterator<Partition> it = _partitionToOffset.keySet().iterator(); + while (it.hasNext()) { + if (!partitions.contains(it.next())) { + it.remove(); + } + } + } + } + + public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) + throws TopicOffsetOutOfRangeException, FailedFetchException,RuntimeException { + ByteBufferMessageSet msgs = null; + String topic = partition.topic; + int partitionId = partition.partition; + FetchRequestBuilder builder = new FetchRequestBuilder(); + FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). + clientId(config.clientId).maxWait(config.fetchMaxWait).build(); + FetchResponse fetchResponse; + try { + fetchResponse = consumer.fetch(fetchRequest); + } catch (Exception e) { + if (e instanceof ConnectException || + e instanceof SocketTimeoutException || + e instanceof IOException || + e instanceof UnresolvedAddressException + ) { + LOG.warn("Network error when fetching messages:", e); + throw new FailedFetchException(e); + } else { + throw new RuntimeException(e); + } + } + if (fetchResponse.hasError()) { + KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId)); + if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) { + String msg = partition + " Got fetch request with offset out of range: [" + offset + "]"; + LOG.warn(msg); + throw new TopicOffsetOutOfRangeException(msg); + } else { + String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]"; + LOG.error(message); + throw new FailedFetchException(message); + } + } else { + msgs = fetchResponse.messageSet(topic, partitionId); + } + return msgs; + } + + + public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, String topic) { + Iterable<List<Object>> tups; + ByteBuffer payload = msg.payload(); + if (payload == null) { + return null; + } + ByteBuffer key = msg.key(); + if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) { + tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(key, payload); + } else { + if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) { + tups = ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, payload); + } else { + tups = kafkaConfig.scheme.deserialize(payload); + } + } + return tups; + } + + public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) { + ByteBuffer payload = msg.payload(); + if (payload == null) { + return null; + } + return scheme.deserializeMessageWithMetadata(payload, partition, offset); + } + + + public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) { + Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks"); + List<Partition> taskPartitions = new ArrayList<Partition>(); + List<Partition> partitions = new ArrayList<Partition>(); + for(GlobalPartitionInformation partitionInformation : partitons) { + partitions.addAll(partitionInformation.getOrderedPartitions()); + } + int numPartitions = partitions.size(); + if (numPartitions < totalTasks) { + LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle"); + } + for (int i = taskIndex; i < numPartitions; i += totalTasks) { + Partition taskPartition = partitions.get(i); + taskPartitions.add(taskPartition); + } + logPartitionMapping(totalTasks, taskIndex, taskPartitions); + return taskPartitions; + } + + private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) { + String taskPrefix = taskId(taskIndex, totalTasks); + if (taskPartitions.isEmpty()) { + LOG.warn(taskPrefix + "no partitions assigned"); + } else { + LOG.info(taskPrefix + "assigned " + taskPartitions); + } + } + + public static String taskId(int taskIndex, int totalTasks) { + return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] "; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java new file mode 100644 index 0000000..3f9acc2 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.spout.Scheme; + +import java.nio.ByteBuffer; +import java.util.List; + +public interface KeyValueScheme extends Scheme { + List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java new file mode 100644 index 0000000..25053dd --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.spout.SchemeAsMultiScheme; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme { + + public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) { + super(scheme); + } + + public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, final ByteBuffer value) { + List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value); + if(o == null) return null; + else return Arrays.asList(o); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java new file mode 100644 index 0000000..d0fc08e --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.spout.Scheme; + +import java.nio.ByteBuffer; +import java.util.List; + +public interface MessageMetadataScheme extends Scheme { + List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java new file mode 100644 index 0000000..a53fa88 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +import org.apache.storm.spout.SchemeAsMultiScheme; + +public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme { + private static final long serialVersionUID = -7172403703813625116L; + + public MessageMetadataSchemeAsMultiScheme(MessageMetadataScheme scheme) { + super(scheme); + } + + public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) { + List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset); + if (o == null) { + return null; + } else { + return Arrays.asList(o); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java new file mode 100644 index 0000000..afdf8af --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import com.google.common.base.Objects; +import org.apache.storm.trident.spout.ISpoutPartition; + + +public class Partition implements ISpoutPartition { + + public Broker host; + public int partition; + public String topic; + + //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition + private Boolean bUseTopicNameForPartitionPathId; + + // for kryo compatibility + private Partition() { + + } + public Partition(Broker host, String topic, int partition) { + this.topic = topic; + this.host = host; + this.partition = partition; + this.bUseTopicNameForPartitionPathId = false; + } + + public Partition(Broker host, String topic, int partition,Boolean bUseTopicNameForPartitionPathId) { + this.topic = topic; + this.host = host; + this.partition = partition; + this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId; + } + + @Override + public int hashCode() { + return Objects.hashCode(host, topic, partition); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final Partition other = (Partition) obj; + return Objects.equal(this.host, other.host) && Objects.equal(this.topic, other.topic) && Objects.equal(this.partition, other.partition); + } + + @Override + public String toString() { + return "Partition{" + + "host=" + host + + ", topic=" + topic + + ", partition=" + partition + + '}'; + } + + @Override + public String getId() { + if (bUseTopicNameForPartitionPathId) { + return topic + "/partition_" + partition; + } else { + //Keep the Partition Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition + return "partition_" + partition; + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java new file mode 100644 index 0000000..c9004fa --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import java.util.List; + +public interface PartitionCoordinator { + List<PartitionManager> getMyManagedPartitions(); + + PartitionManager getManager(Partition partition); + + void refresh(); +}
