Fixing stylecheck problems with storm-kafka
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4fe4f04b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4fe4f04b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4fe4f04b Branch: refs/heads/master Commit: 4fe4f04bb9750301b96f5c20142acb9a9a6a6000 Parents: 1a2d131 Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 22:59:46 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 00:22:37 2018 -0400 ---------------------------------------------------------------------- external/storm-kafka/pom.xml | 2 +- .../src/jvm/org/apache/storm/kafka/Broker.java | 51 ++-- .../jvm/org/apache/storm/kafka/BrokerHosts.java | 19 +- .../storm/kafka/ByteBufferSerializer.java | 26 +- .../storm/kafka/DynamicBrokersReader.java | 112 ++++---- .../kafka/DynamicPartitionConnections.java | 55 ++-- .../ExponentialBackoffMsgRetryManager.java | 75 +++--- .../storm/kafka/FailedFetchException.java | 19 +- .../storm/kafka/FailedMsgRetryManager.java | 19 +- .../org/apache/storm/kafka/IntSerializer.java | 24 +- .../jvm/org/apache/storm/kafka/KafkaConfig.java | 25 +- .../jvm/org/apache/storm/kafka/KafkaError.java | 19 +- .../jvm/org/apache/storm/kafka/KafkaSpout.java | 58 ++-- .../jvm/org/apache/storm/kafka/KafkaUtils.java | 269 +++++++++---------- .../org/apache/storm/kafka/KeyValueScheme.java | 22 +- .../kafka/KeyValueSchemeAsMultiScheme.java | 31 +-- .../storm/kafka/MessageMetadataScheme.java | 22 +- .../MessageMetadataSchemeAsMultiScheme.java | 20 +- .../jvm/org/apache/storm/kafka/Partition.java | 39 ++- .../storm/kafka/PartitionCoordinator.java | 19 +- .../apache/storm/kafka/PartitionManager.java | 121 +++++---- .../jvm/org/apache/storm/kafka/SpoutConfig.java | 19 +- .../apache/storm/kafka/StaticCoordinator.java | 27 +- .../jvm/org/apache/storm/kafka/StaticHosts.java | 19 +- .../storm/kafka/StaticPartitionConnections.java | 22 +- .../storm/kafka/StringKeyValueScheme.java | 24 +- .../kafka/StringMessageAndMetadataScheme.java | 27 +- .../storm/kafka/StringMultiSchemeWithTopic.java | 28 +- .../org/apache/storm/kafka/StringScheme.java | 38 ++- .../kafka/TopicOffsetOutOfRangeException.java | 1 + .../org/apache/storm/kafka/ZkCoordinator.java | 50 ++-- .../src/jvm/org/apache/storm/kafka/ZkHosts.java | 19 +- .../src/jvm/org/apache/storm/kafka/ZkState.java | 65 +++-- .../org/apache/storm/kafka/bolt/KafkaBolt.java | 69 +++-- .../FieldNameBasedTupleToKafkaMapper.java | 21 +- .../kafka/bolt/mapper/TupleToKafkaMapper.java | 25 +- .../bolt/selector/DefaultTopicSelector.java | 19 +- .../bolt/selector/FieldIndexTopicSelector.java | 19 +- .../bolt/selector/FieldNameTopicSelector.java | 19 +- .../kafka/bolt/selector/KafkaTopicSelector.java | 22 +- .../apache/storm/kafka/trident/Coordinator.java | 27 +- .../storm/kafka/trident/DefaultCoordinator.java | 19 +- .../trident/GlobalPartitionInformation.java | 41 ++- .../storm/kafka/trident/IBatchCoordinator.java | 19 +- .../storm/kafka/trident/IBrokerReader.java | 20 +- .../apache/storm/kafka/trident/MaxMetric.java | 19 +- .../kafka/trident/OpaqueTridentKafkaSpout.java | 44 ++- .../storm/kafka/trident/StaticBrokerReader.java | 23 +- .../trident/TransactionalTridentKafkaSpout.java | 22 +- .../storm/kafka/trident/TridentKafkaConfig.java | 19 +- .../kafka/trident/TridentKafkaEmitter.java | 73 +++-- .../storm/kafka/trident/TridentKafkaState.java | 49 ++-- .../kafka/trident/TridentKafkaStateFactory.java | 34 +-- .../kafka/trident/TridentKafkaUpdater.java | 22 +- .../storm/kafka/trident/ZkBrokerReader.java | 113 ++++---- .../FieldNameBasedTupleToKafkaMapper.java | 19 +- .../mapper/TridentTupleToKafkaMapper.java | 26 +- .../trident/selector/DefaultTopicSelector.java | 19 +- .../trident/selector/KafkaTopicSelector.java | 22 +- .../storm/kafka/DynamicBrokersReaderTest.java | 39 ++- .../ExponentialBackoffMsgRetryManagerTest.java | 49 ++-- .../org/apache/storm/kafka/KafkaErrorTest.java | 19 +- .../org/apache/storm/kafka/KafkaTestBroker.java | 29 +- .../org/apache/storm/kafka/KafkaUtilsTest.java | 73 +++-- .../storm/kafka/PartitionManagerTest.java | 34 +-- .../storm/kafka/StringKeyValueSchemeTest.java | 36 ++- .../apache/storm/kafka/TestStringScheme.java | 28 +- .../test/org/apache/storm/kafka/TestUtils.java | 32 ++- .../apache/storm/kafka/TridentKafkaTest.java | 40 ++- .../apache/storm/kafka/ZkCoordinatorTest.java | 49 ++-- .../apache/storm/kafka/bolt/KafkaBoltTest.java | 159 ++++++----- 71 files changed, 1246 insertions(+), 1551 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml index d3b38db..2e0f35b 100644 --- a/external/storm-kafka/pom.xml +++ b/external/storm-kafka/pom.xml @@ -57,7 +57,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>557</maxAllowedViolations> + <maxAllowedViolations>180</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java index 0d95e8d..b33af99 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import com.google.common.base.Objects; - import java.io.Serializable; public class Broker implements Serializable, Comparable<Broker> { @@ -27,9 +21,9 @@ public class Broker implements Serializable, Comparable<Broker> { // for kryo compatibility private Broker() { - + } - + public Broker(String host, int port) { this.host = host; this.port = port; @@ -39,6 +33,19 @@ public class Broker implements Serializable, Comparable<Broker> { this(host, 9092); } + public static Broker fromString(String host) { + Broker hp; + String[] spec = host.split(":"); + if (spec.length == 1) { + hp = new Broker(spec[0]); + } else if (spec.length == 2) { + hp = new Broker(spec[0], Integer.parseInt(spec[1])); + } else { + throw new IllegalArgumentException("Invalid host specification: " + host); + } + return hp; + } + @Override public int hashCode() { return Objects.hashCode(host, port); @@ -61,20 +68,6 @@ public class Broker implements Serializable, Comparable<Broker> { return host + ":" + port; } - public static Broker fromString(String host) { - Broker hp; - String[] spec = host.split(":"); - if (spec.length == 1) { - hp = new Broker(spec[0]); - } else if (spec.length == 2) { - hp = new Broker(spec[0], Integer.parseInt(spec[1])); - } else { - throw new IllegalArgumentException("Invalid host specification: " + host); - } - return hp; - } - - @Override public int compareTo(Broker o) { if (this.host.equals(o.host)) { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java index 13ba0a1..dbd6a10 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java index 2a18a7f..37986a5 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java @@ -15,27 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.kafka; -import org.apache.storm.utils.Utils; -import org.apache.kafka.common.serialization.Serializer; +package org.apache.storm.kafka; import java.nio.ByteBuffer; import java.util.Map; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.storm.utils.Utils; public class ByteBufferSerializer implements Serializer<ByteBuffer> { - @Override - public void configure(Map<String, ?> map, boolean b) { + @Override + public void configure(Map<String, ?> map, boolean b) { - } + } - @Override - public void close() { + @Override + public void close() { - } + } - @Override - public byte[] serialize(String s, ByteBuffer b) { - return Utils.toByteArray(b); - } + @Override + public byte[] serialize(String s, ByteBuffer b) { + return Utils.toByteArray(b); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java index c203359..49ad530 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java @@ -1,37 +1,31 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; -import org.apache.storm.Config; -import org.apache.storm.utils.ObjectReader; import com.google.common.base.Preconditions; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; +import org.apache.storm.Config; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; +import org.apache.storm.utils.ObjectReader; import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.kafka.trident.GlobalPartitionInformation; - -import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; public class DynamicBrokersReader { @@ -49,7 +43,7 @@ public class DynamicBrokersReader { validateConfig(conf); - Preconditions.checkNotNull(zkStr,"zkString cannot be null"); + Preconditions.checkNotNull(zkStr, "zkString cannot be null"); Preconditions.checkNotNull(zkPath, "zkPath cannot be null"); Preconditions.checkNotNull(topic, "topic cannot be null"); @@ -58,11 +52,11 @@ public class DynamicBrokersReader { _isWildcardTopic = ObjectReader.getBoolean(conf.get("kafka.topic.wildcard.match"), false); try { _curator = CuratorFrameworkFactory.newClient( - zkStr, - ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), - ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), - new RetryNTimes(ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), - ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)))); + zkStr, + ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), + ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), + new RetryNTimes(ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), + ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)))); _curator.start(); } catch (Exception ex) { LOG.error("Couldn't connect to zookeeper", ex); @@ -74,33 +68,33 @@ public class DynamicBrokersReader { * Get all partitions with their current leaders */ public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException { - List<String> topics = getTopics(); - List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); - - for (String topic : topics) { - GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic); - try { - int numPartitionsForTopic = getNumPartitions(topic); - String brokerInfoPath = brokerPath(); - for (int partition = 0; partition < numPartitionsForTopic; partition++) { - int leader = getLeaderFor(topic,partition); - String path = brokerInfoPath + "/" + leader; - try { - byte[] brokerData = _curator.getData().forPath(path); - Broker hp = getBrokerHost(brokerData); - globalPartitionInformation.addPartition(partition, hp); - } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { - LOG.error("Node {} does not exist ", path); - } - } - } catch (SocketTimeoutException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - LOG.info("Read partition info from zookeeper: " + globalPartitionInformation); - partitions.add(globalPartitionInformation); - } + List<String> topics = getTopics(); + List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); + + for (String topic : topics) { + GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic); + try { + int numPartitionsForTopic = getNumPartitions(topic); + String brokerInfoPath = brokerPath(); + for (int partition = 0; partition < numPartitionsForTopic; partition++) { + int leader = getLeaderFor(topic, partition); + String path = brokerInfoPath + "/" + leader; + try { + byte[] brokerData = _curator.getData().forPath(path); + Broker hp = getBrokerHost(brokerData); + globalPartitionInformation.addPartition(partition, hp); + } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { + LOG.error("Node {} does not exist ", path); + } + } + } catch (SocketTimeoutException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + LOG.info("Read partition info from zookeeper: " + globalPartitionInformation); + partitions.add(globalPartitionInformation); + } return partitions; } @@ -135,9 +129,10 @@ public class DynamicBrokersReader { } } - public String topicsPath () { + public String topicsPath() { return _zkPath + "/topics"; } + public String partitionPath(String topic) { return topicsPath() + "/" + topic + "/partitions"; } @@ -147,7 +142,6 @@ public class DynamicBrokersReader { } - /** * get /brokers/topics/distributedTopic/partitions/1/state * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 } @@ -202,13 +196,13 @@ public class DynamicBrokersReader { */ private void validateConfig(final Map<String, Object> conf) { Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT), - "%s cannot be null", Config.STORM_ZOOKEEPER_SESSION_TIMEOUT); + "%s cannot be null", Config.STORM_ZOOKEEPER_SESSION_TIMEOUT); Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT), - "%s cannot be null", Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT); + "%s cannot be null", Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT); Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES), - "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_TIMES); + "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_TIMES); Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL), - "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_INTERVAL); + "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_INTERVAL); } } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java index 4c5dba5..1ca7144 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java @@ -1,50 +1,33 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka; -import kafka.javaapi.consumer.SimpleConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.storm.kafka.trident.IBrokerReader; +package org.apache.storm.kafka; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import kafka.javaapi.consumer.SimpleConsumer; +import org.apache.storm.kafka.trident.IBrokerReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DynamicPartitionConnections { private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class); - - static class ConnectionInfo { - SimpleConsumer consumer; - Set<String> partitions = new HashSet<String>(); - - public ConnectionInfo(SimpleConsumer consumer) { - this.consumer = consumer; - } - } - Map<Broker, ConnectionInfo> _connections = new HashMap<>(); KafkaConfig _config; IBrokerReader _reader; - public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) { _config = config; _reader = brokerReader; @@ -57,10 +40,11 @@ public class DynamicPartitionConnections { public SimpleConsumer register(Broker host, String topic, int partition) { if (!_connections.containsKey(host)) { - _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId))); + _connections.put(host, new ConnectionInfo( + new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId))); } ConnectionInfo info = _connections.get(host); - info.partitions.add(getHashKey(topic,partition)); + info.partitions.add(getHashKey(topic, partition)); return info.consumer; } @@ -74,7 +58,7 @@ public class DynamicPartitionConnections { public void unregister(Broker port, String topic, int partition) { ConnectionInfo info = _connections.get(port); - info.partitions.remove(getHashKey(topic,partition)); + info.partitions.remove(getHashKey(topic, partition)); if (info.partitions.isEmpty()) { info.consumer.close(); _connections.remove(port); @@ -95,4 +79,13 @@ public class DynamicPartitionConnections { private String getHashKey(String topic, int partition) { return topic + "_" + partition; } + + static class ConnectionInfo { + SimpleConsumer consumer; + Set<String> partitions = new HashSet<String>(); + + public ConnectionInfo(SimpleConsumer consumer) { + this.consumer = consumer; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java index 60654a5..2651c30 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java @@ -1,23 +1,17 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; -import org.apache.storm.utils.Time; import java.util.Comparator; import java.util.HashSet; import java.util.Map; @@ -25,6 +19,7 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.storm.utils.Time; public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager { @@ -34,7 +29,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager private int retryLimit; private Queue<MessageRetryRecord> waiting; - private Map<Long,MessageRetryRecord> records; + private Map<Long, MessageRetryRecord> records; public ExponentialBackoffMsgRetryManager() { @@ -46,15 +41,15 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs; this.retryLimit = spoutConfig.retryLimit; this.waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator()); - this.records = new ConcurrentHashMap<Long,MessageRetryRecord>(); + this.records = new ConcurrentHashMap<Long, MessageRetryRecord>(); } @Override public void failed(Long offset) { MessageRetryRecord oldRecord = this.records.get(offset); MessageRetryRecord newRecord = oldRecord == null ? - new MessageRetryRecord(offset) : - oldRecord.createNextRetryRecord(); + new MessageRetryRecord(offset) : + oldRecord.createNextRetryRecord(); this.records.put(offset, newRecord); this.waiting.add(newRecord); } @@ -98,16 +93,16 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager public boolean shouldReEmitMsg(Long offset) { MessageRetryRecord record = this.records.get(offset); return record != null && - this.waiting.contains(record) && - Time.currentTimeMillis() >= record.retryTimeUTC; + this.waiting.contains(record) && + Time.currentTimeMillis() >= record.retryTimeUTC; } @Override public boolean retryFurther(Long offset) { MessageRetryRecord record = this.records.get(offset); - return ! (record != null && - this.retryLimit > 0 && - this.retryLimit <= record.retryNum); + return !(record != null && + this.retryLimit > 0 && + this.retryLimit <= record.retryNum); } @Override @@ -117,9 +112,9 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager @Override public Set<Long> clearOffsetsBefore(Long kafkaOffset) { - Set<Long> invalidOffsets = new HashSet<Long>(); - for(Long offset : records.keySet()){ - if(offset < kafkaOffset){ + Set<Long> invalidOffsets = new HashSet<Long>(); + for (Long offset : records.keySet()) { + if (offset < kafkaOffset) { MessageRetryRecord record = this.records.remove(offset); if (record != null) { this.waiting.remove(record); @@ -130,6 +125,19 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager return invalidOffsets; } + private static class RetryTimeComparator implements Comparator<MessageRetryRecord> { + + @Override + public int compare(MessageRetryRecord record1, MessageRetryRecord record2) { + return Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC)); + } + + @Override + public boolean equals(Object obj) { + return false; + } + } + /** * A MessageRetryRecord holds the data of how many times a message has * failed and been retried, and when the last failure occurred. It can @@ -174,8 +182,8 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager double delay = retryInitialDelayMs * delayMultiplier; Long maxLong = Long.MAX_VALUE; long delayThisRetryMs = delay >= maxLong.doubleValue() - ? maxLong - : (long) delay; + ? maxLong + : (long) delay; return Math.min(delayThisRetryMs, retryDelayMaxMs); } @@ -190,17 +198,4 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager return Long.valueOf(this.offset).hashCode(); } } - - private static class RetryTimeComparator implements Comparator<MessageRetryRecord> { - - @Override - public int compare(MessageRetryRecord record1, MessageRetryRecord record2) { - return Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC)); - } - - @Override - public boolean equals(Object obj) { - return false; - } - } } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java index 448d0c3..a1f3fe5 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; public class FailedFetchException extends RuntimeException { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java index c7a7a04..b7fafdc 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java index 7cdfc87..fef8625 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java @@ -15,28 +15,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.kafka; -import org.apache.kafka.common.serialization.Serializer; +package org.apache.storm.kafka; import java.util.Map; +import org.apache.kafka.common.serialization.Serializer; public class IntSerializer implements Serializer<Integer> { - @Override - public void configure(Map<String, ?> map, boolean b) { - } + @Override + public void configure(Map<String, ?> map, boolean b) { + } - @Override - public byte[] serialize(String topic, Integer val) { - return new byte[] { + @Override + public byte[] serialize(String topic, Integer val) { + return new byte[]{ (byte) (val >>> 24), (byte) (val >>> 16), (byte) (val >>> 8), val.byteValue() }; - } + } - @Override - public void close() { - } + @Override + public void close() { + } } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java index 1c9ada8..a93f426 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java @@ -1,32 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import java.io.Serializable; - +import kafka.api.FetchRequest; import org.apache.storm.spout.MultiScheme; import org.apache.storm.spout.RawMultiScheme; -import kafka.api.FetchRequest; - public class KafkaConfig implements Serializable { private static final long serialVersionUID = 5276718734571623855L; - + public final BrokerHosts hosts; public final String topic; public final String clientId; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java index 1d866e7..4bf2ed2 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; public enum KafkaError { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java index 7b1243c..aed5986 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java @@ -1,24 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import com.google.common.base.Strings; - +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.storm.Config; import org.apache.storm.kafka.PartitionManager.KafkaMessageId; import org.apache.storm.kafka.trident.GlobalPartitionInformation; @@ -30,27 +29,16 @@ import org.apache.storm.topology.base.BaseRichSpout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; - // TODO: need to add blacklisting // TODO: need to make a best effort to not re-emit messages if don't have to public class KafkaSpout extends BaseRichSpout { - static enum EmitState { - EMITTED_MORE_LEFT, - EMITTED_END, - NO_EMITTED - } - private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); - SpoutConfig _spoutConfig; SpoutOutputCollector _collector; PartitionCoordinator _coordinator; DynamicPartitionConnections _connections; ZkState _state; - long _lastUpdateMs = 0; - int _currPartitionIndex = 0; public KafkaSpout(SpoutConfig spoutConf) { @@ -81,12 +69,12 @@ public class KafkaSpout extends BaseRichSpout { int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, - _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, context.getThisTaskId(), topologyInstanceId); + _spoutConfig, _state, context.getThisTaskIndex(), + totalTasks, context.getThisTaskId(), topologyInstanceId); } else { _coordinator = new ZkCoordinator(_connections, conf, - _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, context.getThisTaskId(), topologyInstanceId); + _spoutConfig, _state, context.getThisTaskIndex(), + totalTasks, context.getThisTaskId(), topologyInstanceId); } context.registerMetric("kafkaOffset", new IMetric() { @@ -158,7 +146,7 @@ public class KafkaSpout extends BaseRichSpout { } private PartitionManager getManagerForPartition(int partition) { - for (PartitionManager partitionManager: _coordinator.getMyManagedPartitions()) { + for (PartitionManager partitionManager : _coordinator.getMyManagedPartitions()) { if (partitionManager.getPartition().partition == partition) { return partitionManager; } @@ -203,7 +191,7 @@ public class KafkaSpout extends BaseRichSpout { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { + if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { declarer.declareStream(_spoutConfig.outputStreamId, _spoutConfig.scheme.getOutputFields()); } else { declarer.declare(_spoutConfig.scheme.getOutputFields()); @@ -211,7 +199,7 @@ public class KafkaSpout extends BaseRichSpout { } @Override - public Map<String, Object> getComponentConfiguration () { + public Map<String, Object> getComponentConfiguration() { Map<String, Object> configuration = super.getComponentConfiguration(); if (configuration == null) { configuration = new HashMap<>(); @@ -240,7 +228,7 @@ public class KafkaSpout extends BaseRichSpout { List<Partition> partitions = globalPartitionInformation.getOrderedPartitions(); StringBuilder staticPartitions = new StringBuilder(); StringBuilder leaderHosts = new StringBuilder(); - for (Partition partition: partitions) { + for (Partition partition : partitions) { staticPartitions.append(partition.partition + ","); leaderHosts.append(partition.host.host + ":" + partition.host.port).append(","); } @@ -258,4 +246,10 @@ public class KafkaSpout extends BaseRichSpout { } } + static enum EmitState { + EMITTED_MORE_LEFT, + EMITTED_END, + NO_EMITTED + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index 76bb896..38958b2 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -1,32 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import com.google.common.base.Preconditions; - -import org.apache.storm.kafka.trident.GlobalPartitionInformation; -import org.apache.storm.kafka.trident.IBrokerReader; -import org.apache.storm.kafka.trident.StaticBrokerReader; -import org.apache.storm.kafka.trident.ZkBrokerReader; -import org.apache.storm.metric.api.IMetric; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; @@ -39,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; - import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; @@ -49,6 +34,13 @@ import kafka.javaapi.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; +import org.apache.storm.kafka.trident.IBrokerReader; +import org.apache.storm.kafka.trident.StaticBrokerReader; +import org.apache.storm.kafka.trident.ZkBrokerReader; +import org.apache.storm.metric.api.IMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaUtils { @@ -57,7 +49,7 @@ public class KafkaUtils { private static final int NO_OFFSET = -5; //suppress default constructor for noninstantiablility - private KafkaUtils(){ + private KafkaUtils() { throw new AssertionError(); } @@ -80,7 +72,7 @@ public class KafkaUtils { Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1)); OffsetRequest request = new OffsetRequest( - requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition); if (offsets.length > 0) { @@ -90,121 +82,23 @@ public class KafkaUtils { } } - public static class KafkaOffsetMetric implements IMetric { - Map<Partition, PartitionManager.OffsetData> _partitionToOffset = new HashMap<Partition, PartitionManager.OffsetData>(); - Set<Partition> _partitions; - DynamicPartitionConnections _connections; - - public KafkaOffsetMetric(DynamicPartitionConnections connections) { - _connections = connections; - } - - public void setOffsetData(Partition partition, PartitionManager.OffsetData offsetData) { - _partitionToOffset.put(partition, offsetData); - } - - private class TopicMetrics { - long totalSpoutLag = 0; - long totalEarliestTimeOffset = 0; - long totalLatestTimeOffset = 0; - long totalLatestEmittedOffset = 0; - long totalLatestCompletedOffset = 0; - } - - @Override - public Object getValueAndReset() { - try { - HashMap<String, Long> ret = new HashMap<>(); - if (_partitions != null && _partitions.size() == _partitionToOffset.size()) { - Map<String,TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>(); - for (Map.Entry<Partition, PartitionManager.OffsetData> e : _partitionToOffset.entrySet()) { - Partition partition = e.getKey(); - SimpleConsumer consumer = _connections.getConnection(partition); - if (consumer == null) { - LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?"); - return null; - } - long latestTimeOffset = getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime()); - long earliestTimeOffset = getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); - if (latestTimeOffset == KafkaUtils.NO_OFFSET) { - LOG.warn("No data found in Kafka Partition " + partition.getId()); - return null; - } - long latestEmittedOffset = e.getValue().latestEmittedOffset; - long latestCompletedOffset = e.getValue().latestCompletedOffset; - long spoutLag = latestTimeOffset - latestCompletedOffset; - String topic = partition.topic; - String metricPath = partition.getId(); - //Handle the case where Partition Path Id does not contain topic name Partition.getId() == "partition_" + partition - if (!metricPath.startsWith(topic + "/")) { - metricPath = topic + "/" + metricPath; - } - ret.put(metricPath + "/" + "spoutLag", spoutLag); - ret.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset); - ret.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset); - ret.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset); - ret.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset); - - if (!topicMetricsMap.containsKey(partition.topic)) { - topicMetricsMap.put(partition.topic,new TopicMetrics()); - } - - TopicMetrics topicMetrics = topicMetricsMap.get(partition.topic); - topicMetrics.totalSpoutLag += spoutLag; - topicMetrics.totalEarliestTimeOffset += earliestTimeOffset; - topicMetrics.totalLatestTimeOffset += latestTimeOffset; - topicMetrics.totalLatestEmittedOffset += latestEmittedOffset; - topicMetrics.totalLatestCompletedOffset += latestCompletedOffset; - } - - for(Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) { - String topic = e.getKey(); - TopicMetrics topicMetrics = e.getValue(); - ret.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag); - ret.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset); - ret.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset); - ret.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset); - ret.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset); - } - - return ret; - } else { - LOG.info("Metrics Tick: Not enough data to calculate spout lag."); - } - } catch (Throwable t) { - LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t); - } - return null; - } - - public void refreshPartitions(Set<Partition> partitions) { - _partitions = partitions; - Iterator<Partition> it = _partitionToOffset.keySet().iterator(); - while (it.hasNext()) { - if (!partitions.contains(it.next())) { - it.remove(); - } - } - } - } - public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) - throws TopicOffsetOutOfRangeException, FailedFetchException,RuntimeException { + throws TopicOffsetOutOfRangeException, FailedFetchException, RuntimeException { ByteBufferMessageSet msgs = null; String topic = partition.topic; int partitionId = partition.partition; FetchRequestBuilder builder = new FetchRequestBuilder(); FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). - clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build(); + clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build(); FetchResponse fetchResponse; try { fetchResponse = consumer.fetch(fetchRequest); } catch (Exception e) { if (e instanceof ConnectException || - e instanceof SocketTimeoutException || - e instanceof IOException || - e instanceof UnresolvedAddressException - ) { + e instanceof SocketTimeoutException || + e instanceof IOException || + e instanceof UnresolvedAddressException + ) { LOG.warn("Network error when fetching messages:", e); throw new FailedFetchException(e); } else { @@ -225,11 +119,11 @@ public class KafkaUtils { } else { msgs = fetchResponse.messageSet(topic, partitionId); } - LOG.debug("Messages fetched. [config = {}], [consumer = {}], [partition = {}], [offset = {}], [msgs = {}]", config, consumer, partition, offset, msgs); + LOG.debug("Messages fetched. [config = {}], [consumer = {}], [partition = {}], [offset = {}], [msgs = {}]", config, consumer, + partition, offset, msgs); return msgs; } - public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, String topic) { Iterable<List<Object>> tups; ByteBuffer payload = msg.payload(); @@ -241,15 +135,16 @@ public class KafkaUtils { tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(key, payload); } else { if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) { - tups = ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, payload); + tups = ((StringMultiSchemeWithTopic) kafkaConfig.scheme).deserializeWithTopic(topic, payload); } else { tups = kafkaConfig.scheme.deserialize(payload); } } return tups; } - - public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) { + + public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, + long offset) { ByteBuffer payload = msg.payload(); if (payload == null) { return null; @@ -257,18 +152,18 @@ public class KafkaUtils { return scheme.deserializeMessageWithMetadata(payload, partition, offset); } - public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, - int totalTasks, int taskIndex, int taskId) { + int totalTasks, int taskIndex, int taskId) { Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks"); List<Partition> taskPartitions = new ArrayList<Partition>(); List<Partition> partitions = new ArrayList<Partition>(); - for(GlobalPartitionInformation partitionInformation : partitons) { + for (GlobalPartitionInformation partitionInformation : partitons) { partitions.addAll(partitionInformation.getOrderedPartitions()); } int numPartitions = partitions.size(); if (numPartitions < totalTasks) { - LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle"); + LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + + "), some tasks will be idle"); } for (int i = taskIndex; i < numPartitions; i += totalTasks) { Partition taskPartition = partitions.get(i); @@ -290,4 +185,104 @@ public class KafkaUtils { public static String taskPrefix(int taskIndex, int totalTasks, int taskId) { return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId; } + + public static class KafkaOffsetMetric implements IMetric { + Map<Partition, PartitionManager.OffsetData> _partitionToOffset = new HashMap<Partition, PartitionManager.OffsetData>(); + Set<Partition> _partitions; + DynamicPartitionConnections _connections; + + public KafkaOffsetMetric(DynamicPartitionConnections connections) { + _connections = connections; + } + + public void setOffsetData(Partition partition, PartitionManager.OffsetData offsetData) { + _partitionToOffset.put(partition, offsetData); + } + + @Override + public Object getValueAndReset() { + try { + HashMap<String, Long> ret = new HashMap<>(); + if (_partitions != null && _partitions.size() == _partitionToOffset.size()) { + Map<String, TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>(); + for (Map.Entry<Partition, PartitionManager.OffsetData> e : _partitionToOffset.entrySet()) { + Partition partition = e.getKey(); + SimpleConsumer consumer = _connections.getConnection(partition); + if (consumer == null) { + LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?"); + return null; + } + long latestTimeOffset = + getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime()); + long earliestTimeOffset = + getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); + if (latestTimeOffset == KafkaUtils.NO_OFFSET) { + LOG.warn("No data found in Kafka Partition " + partition.getId()); + return null; + } + long latestEmittedOffset = e.getValue().latestEmittedOffset; + long latestCompletedOffset = e.getValue().latestCompletedOffset; + long spoutLag = latestTimeOffset - latestCompletedOffset; + String topic = partition.topic; + String metricPath = partition.getId(); + //Handle the case where Partition Path Id does not contain topic name Partition.getId() == "partition_" + partition + if (!metricPath.startsWith(topic + "/")) { + metricPath = topic + "/" + metricPath; + } + ret.put(metricPath + "/" + "spoutLag", spoutLag); + ret.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset); + ret.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset); + ret.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset); + ret.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset); + + if (!topicMetricsMap.containsKey(partition.topic)) { + topicMetricsMap.put(partition.topic, new TopicMetrics()); + } + + TopicMetrics topicMetrics = topicMetricsMap.get(partition.topic); + topicMetrics.totalSpoutLag += spoutLag; + topicMetrics.totalEarliestTimeOffset += earliestTimeOffset; + topicMetrics.totalLatestTimeOffset += latestTimeOffset; + topicMetrics.totalLatestEmittedOffset += latestEmittedOffset; + topicMetrics.totalLatestCompletedOffset += latestCompletedOffset; + } + + for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) { + String topic = e.getKey(); + TopicMetrics topicMetrics = e.getValue(); + ret.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag); + ret.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset); + ret.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset); + ret.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset); + ret.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset); + } + + return ret; + } else { + LOG.info("Metrics Tick: Not enough data to calculate spout lag."); + } + } catch (Throwable t) { + LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t); + } + return null; + } + + public void refreshPartitions(Set<Partition> partitions) { + _partitions = partitions; + Iterator<Partition> it = _partitionToOffset.keySet().iterator(); + while (it.hasNext()) { + if (!partitions.contains(it.next())) { + it.remove(); + } + } + } + + private class TopicMetrics { + long totalSpoutLag = 0; + long totalEarliestTimeOffset = 0; + long totalLatestTimeOffset = 0; + long totalLatestEmittedOffset = 0; + long totalLatestCompletedOffset = 0; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java index 3f9acc2..6bb1dc5 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java @@ -1,26 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka; -import org.apache.storm.spout.Scheme; +package org.apache.storm.kafka; import java.nio.ByteBuffer; import java.util.List; +import org.apache.storm.spout.Scheme; public interface KeyValueScheme extends Scheme { List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java index 25053dd..00983cc 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka; -import org.apache.storm.spout.SchemeAsMultiScheme; +package org.apache.storm.kafka; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import org.apache.storm.spout.SchemeAsMultiScheme; public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme { @@ -30,9 +24,12 @@ public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme { } public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, final ByteBuffer value) { - List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value); - if(o == null) return null; - else return Arrays.asList(o); + List<Object> o = ((KeyValueScheme) scheme).deserializeKeyAndValue(key, value); + if (o == null) { + return null; + } else { + return Arrays.asList(o); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java index d0fc08e..f77f419 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java @@ -1,26 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka; -import org.apache.storm.spout.Scheme; +package org.apache.storm.kafka; import java.nio.ByteBuffer; import java.util.List; +import org.apache.storm.spout.Scheme; public interface MessageMetadataScheme extends Scheme { List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java index a53fa88..f52a772 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java @@ -1,26 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; - import org.apache.storm.spout.SchemeAsMultiScheme; public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java index 99bb9d3..9edf28b 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import com.google.common.base.Objects; @@ -33,16 +28,17 @@ public class Partition implements ISpoutPartition, Serializable { // for kryo compatibility private Partition() { - + } + public Partition(Broker host, String topic, int partition) { this.topic = topic; this.host = host; this.partition = partition; this.bUseTopicNameForPartitionPathId = false; } - - public Partition(Broker host, String topic, int partition,Boolean bUseTopicNameForPartitionPathId) { + + public Partition(Broker host, String topic, int partition, Boolean bUseTopicNameForPartitionPathId) { this.topic = topic; this.host = host; this.partition = partition; @@ -63,22 +59,23 @@ public class Partition implements ISpoutPartition, Serializable { return false; } final Partition other = (Partition) obj; - return Objects.equal(this.host, other.host) && Objects.equal(this.topic, other.topic) && Objects.equal(this.partition, other.partition); + return Objects.equal(this.host, other.host) && Objects.equal(this.topic, other.topic) && + Objects.equal(this.partition, other.partition); } @Override public String toString() { return "Partition{" + - "host=" + host + - ", topic=" + topic + - ", partition=" + partition + - '}'; + "host=" + host + + ", topic=" + topic + + ", partition=" + partition + + '}'; } @Override public String getId() { if (bUseTopicNameForPartitionPathId) { - return topic + "/partition_" + partition; + return topic + "/partition_" + partition; } else { //Keep the Partition Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition return "partition_" + partition; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java index c9004fa..4dba709 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import java.util.List;
