http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index a9a18fd..5805c21 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -1,26 +1,30 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import java.io.Serializable; - +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.MessageAndOffset; import org.apache.storm.Config; import org.apache.storm.kafka.KafkaSpout.EmitState; import org.apache.storm.kafka.trident.MaxMetric; @@ -32,12 +36,6 @@ import org.apache.storm.spout.SpoutOutputCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; - -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; - public class PartitionManager { private static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class); @@ -50,11 +48,8 @@ public class PartitionManager { // Count of messages which were not retried because failedMsgRetryManager didn't consider offset eligible for // retry private final CountMetric _messageIneligibleForRetryCount; - Long _emittedToOffset; - // _pending key = Kafka offset, value = time at which the message was first submitted to the topology - private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>(); private final FailedMsgRetryManager _failedMsgRetryManager; - + Long _emittedToOffset; // retryRecords key = Kafka offset, value = retry info for the given message Long _committedTo; LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<MessageAndOffset>(); @@ -66,15 +61,16 @@ public class PartitionManager { ZkState _state; Map _topoConf; long numberFailed, numberAcked; + // _pending key = Kafka offset, value = time at which the message was first submitted to the topology + private SortedMap<Long, Long> _pending = new TreeMap<Long, Long>(); public PartitionManager( - DynamicPartitionConnections connections, - String topologyInstanceId, - ZkState state, - Map<String, Object> topoConf, - SpoutConfig spoutConfig, - Partition id) - { + DynamicPartitionConnections connections, + String topologyInstanceId, + ZkState state, + Map<String, Object> topoConf, + SpoutConfig spoutConfig, + Partition id) { this(connections, topologyInstanceId, state, topoConf, spoutConfig, id, null); } @@ -82,13 +78,13 @@ public class PartitionManager { * @param previousManager previous partition manager if manager for partition is being recreated */ public PartitionManager( - DynamicPartitionConnections connections, - String topologyInstanceId, - ZkState state, - Map<String, Object> topoConf, - SpoutConfig spoutConfig, - Partition id, - PartitionManager previousManager) { + DynamicPartitionConnections connections, + String topologyInstanceId, + ZkState state, + Map<String, Object> topoConf, + SpoutConfig spoutConfig, + Partition id, + PartitionManager previousManager) { _partition = id; _connections = connections; _spoutConfig = spoutConfig; @@ -105,16 +101,16 @@ public class PartitionManager { _waitingToEmit = previousManager._waitingToEmit; _pending = previousManager._pending; LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit size: {}, _pending size: {}", - _waitingToEmit.size(), - _pending.size()); + _waitingToEmit.size(), + _pending.size()); } else { try { _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance(); _failedMsgRetryManager.prepare(spoutConfig, _topoConf); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>", - FailedMsgRetryManager.class, - spoutConfig.failedMsgRetryManagerClass), e); + FailedMsgRetryManager.class, + spoutConfig.failedMsgRetryManagerClass), e); } String jsonTopologyId = null; @@ -142,7 +138,8 @@ public class PartitionManager { LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset"); } else { _committedTo = jsonOffset; - LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId); + LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + + " - new topology_id: " + topologyInstanceId); } if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) { @@ -150,7 +147,8 @@ public class PartitionManager { Long lastCommittedOffset = _committedTo; _committedTo = currentOffset; LOG.info("Commit offset " + lastCommittedOffset + " is more than " + - spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime); + spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + + spoutConfig.startOffsetTime); } LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo); @@ -191,13 +189,14 @@ public class PartitionManager { Iterable<List<Object>> tups; if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) { - tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.message(), _partition, toEmit.offset()); + tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.message(), _partition, + toEmit.offset()); } else { tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic); } if ((tups != null) && tups.iterator().hasNext()) { - if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { + if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { for (List<Object> tup : tups) { collector.emit(_spoutConfig.outputStreamId, tup, new KafkaMessageId(_partition, toEmit.offset())); } @@ -305,16 +304,17 @@ public class PartitionManager { public void fail(Long offset) { if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) { LOG.info( - "Skipping failed tuple at offset={}" + - " because it's more than maxOffsetBehind={}" + - " behind _emittedToOffset={} for {}", + "Skipping failed tuple at offset={}" + + " because it's more than maxOffsetBehind={}" + + " behind _emittedToOffset={} for {}", offset, _spoutConfig.maxOffsetBehind, _emittedToOffset, _partition ); } else { - LOG.debug("Failing at offset={} with _pending.size()={} pending and _emittedToOffset={} for {}", offset, _pending.size(), _emittedToOffset, _partition); + LOG.debug("Failing at offset={} with _pending.size()={} pending and _emittedToOffset={} for {}", offset, _pending.size(), + _emittedToOffset, _partition); numberFailed++; if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) { throw new RuntimeException("Too many tuple failures"); @@ -337,19 +337,22 @@ public class PartitionManager { public void commit() { long lastCompletedOffset = lastCompletedOffset(); if (_committedTo != lastCompletedOffset) { - LOG.debug("Writing last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, _topologyInstanceId); + LOG.debug("Writing last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, + _topologyInstanceId); Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder() - .put("topology", ImmutableMap.of("id", _topologyInstanceId, - "name", _topoConf.get(Config.TOPOLOGY_NAME))) - .put("offset", lastCompletedOffset) - .put("partition", _partition.partition) - .put("broker", ImmutableMap.of("host", _partition.host.host, - "port", _partition.host.port)) - .put("topic", _partition.topic).build(); + .put("topology", ImmutableMap.of("id", _topologyInstanceId, + "name", _topoConf + .get(Config.TOPOLOGY_NAME))) + .put("offset", lastCompletedOffset) + .put("partition", _partition.partition) + .put("broker", ImmutableMap.of("host", _partition.host.host, + "port", _partition.host.port)) + .put("topic", _partition.topic).build(); _state.writeJSON(committedPath(), data); _committedTo = lastCompletedOffset; - LOG.debug("Wrote last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, _topologyInstanceId); + LOG.debug("Wrote last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, + _topologyInstanceId); } else { LOG.debug("No new offset for {} for topology: {}", _partition, _topologyInstanceId); } @@ -377,7 +380,7 @@ public class PartitionManager { public void close() { commit(); - _connections.unregister(_partition.host, _partition.topic , _partition.partition); + _connections.unregister(_partition.host, _partition.topic, _partition.partition); } static class KafkaMessageId implements Serializable {
http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java index 0ad19d7..74a4a3b 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java index cd23ca6..8d12ee1 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java @@ -1,33 +1,30 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.storm.kafka.trident.GlobalPartitionInformation; -import java.util.*; - public class StaticCoordinator implements PartitionCoordinator { Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>(); List<PartitionManager> _allManagers = new ArrayList<>(); public StaticCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig config, ZkState state, - int taskIndex, int totalTasks, int taskId, String topologyInstanceId) { + int taskIndex, int totalTasks, int taskId, String topologyInstanceId) { StaticHosts hosts = (StaticHosts) config.hosts; List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); partitions.add(hosts.getPartitionInformation()); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java index 33d5c16..1f8f903 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import org.apache.storm.kafka.trident.GlobalPartitionInformation; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java index 77a7211..2c9d4f2 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java @@ -1,26 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka; -import kafka.javaapi.consumer.SimpleConsumer; +package org.apache.storm.kafka; import java.util.HashMap; import java.util.Map; +import kafka.javaapi.consumer.SimpleConsumer; public class StaticPartitionConnections { Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, SimpleConsumer>(); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java index 9ef7f74..3d62961 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java @@ -1,33 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; -import org.apache.storm.tuple.Values; import com.google.common.collect.ImmutableMap; - import java.nio.ByteBuffer; import java.util.List; +import org.apache.storm.tuple.Values; public class StringKeyValueScheme extends StringScheme implements KeyValueScheme { @Override public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) { - if ( key == null ) { + if (key == null) { return deserialize(value); } String keyString = StringScheme.deserializeString(key); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java index e57738d..ab6e500 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java @@ -1,33 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; +package org.apache.storm.kafka; import java.nio.ByteBuffer; import java.util.List; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; public class StringMessageAndMetadataScheme extends StringScheme implements MessageMetadataScheme { - private static final long serialVersionUID = -5441841920447947374L; - public static final String STRING_SCHEME_PARTITION_KEY = "partition"; public static final String STRING_SCHEME_OFFSET = "offset"; + private static final long serialVersionUID = -5441841920447947374L; @Override public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java index e197318..061b30a 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java @@ -1,32 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka; -import org.apache.storm.spout.MultiScheme; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; +package org.apache.storm.kafka; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import org.apache.storm.spout.MultiScheme; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; public class StringMultiSchemeWithTopic - implements MultiScheme { + implements MultiScheme { public static final String STRING_SCHEME_KEY = "str"; public static final String TOPIC_KEY = "topic"; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java index b0b8d27..bcbc058 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java @@ -1,39 +1,29 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka; -import org.apache.storm.spout.Scheme; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; +package org.apache.storm.kafka; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.List; +import org.apache.storm.spout.Scheme; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; public class StringScheme implements Scheme { - private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; public static final String STRING_SCHEME_KEY = "str"; - - public List<Object> deserialize(ByteBuffer bytes) { - return new Values(deserializeString(bytes)); - } + private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; public static String deserializeString(ByteBuffer string) { if (string.hasArray()) { @@ -44,6 +34,10 @@ public class StringScheme implements Scheme { } } + public List<Object> deserialize(ByteBuffer bytes) { + return new Values(deserializeString(bytes)); + } + public Fields getOutputFields() { return new Fields(STRING_SCHEME_KEY); } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java index 8e1c98f..613a62e 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.kafka; public class TopicOffsetOutOfRangeException extends RuntimeException { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java index 136dc51..bc9ebd5 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -1,27 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.kafka.trident.GlobalPartitionInformation; - -import java.util.*; import static org.apache.storm.kafka.KafkaUtils.taskPrefix; @@ -43,12 +42,13 @@ public class ZkCoordinator implements PartitionCoordinator { Map _topoConf; public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state, - int taskIndex, int totalTasks, int taskId, String topologyInstanceId) { - this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId, buildReader(topoConf, spoutConfig)); + int taskIndex, int totalTasks, int taskId, String topologyInstanceId) { + this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId, + buildReader(topoConf, spoutConfig)); } public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state, - int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) { + int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; @@ -103,13 +103,13 @@ public class ZkCoordinator implements PartitionCoordinator { for (Partition id : newPartitions) { PartitionManager man = new PartitionManager( - _connections, - _topologyInstanceId, - _state, - _topoConf, - _spoutConfig, - id, - deletedManagers.get(id.partition)); + _connections, + _topologyInstanceId, + _state, + _topoConf, + _spoutConfig, + id, + deletedManagers.get(id.partition)); _managers.put(id, man); } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java index 2c2a26f..9c6b29d 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java index 7241c60..3d27173 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java @@ -1,53 +1,59 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; -import org.apache.storm.Config; -import org.apache.storm.utils.ObjectReader; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; +import org.apache.storm.Config; +import org.apache.storm.utils.ObjectReader; import org.apache.zookeeper.CreateMode; import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class ZkState { private static final Logger LOG = LoggerFactory.getLogger(ZkState.class); CuratorFramework _curator; + public ZkState(Map<String, Object> stateConf) { + stateConf = new HashMap<>(stateConf); + + try { + _curator = newCurator(stateConf); + _curator.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private CuratorFramework newCurator(final Map<String, Object> stateConf) - throws Exception { + throws Exception { Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT); String serverPorts = ""; for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) { serverPorts = serverPorts + server + ":" + port + ","; } return CuratorFrameworkFactory.newClient(serverPorts, - ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), - ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), - new RetryNTimes(ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), - ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)))); + ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), + ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), + new RetryNTimes(ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), + ObjectReader + .getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)))); } public CuratorFramework getCurator() { @@ -55,17 +61,6 @@ public class ZkState { return _curator; } - public ZkState(Map<String, Object> stateConf) { - stateConf = new HashMap<>(stateConf); - - try { - _curator = newCurator(stateConf); - _curator.start(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - public void writeJSON(String path, Map<Object, Object> data) { LOG.debug("Writing {} the data {}", path, data.toString()); writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8"))); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java index a4c255a..e0b94f3 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java @@ -1,43 +1,36 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.bolt; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.TupleUtils; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.clients.producer.Callback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper; import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector; -import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; -import java.util.Map; -import java.util.Properties; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -57,13 +50,11 @@ import java.util.Properties; @Deprecated public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); - public static final String TOPIC = "topic"; - + private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); private KafkaProducer<K, V> producer; private OutputCollector collector; - private TupleToKafkaMapper<K,V> mapper; + private TupleToKafkaMapper<K, V> mapper; private KafkaTopicSelector topicSelector; private Properties boltSpecfiedProperties = new Properties(); /** @@ -76,18 +67,18 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt { private boolean async = true; public KafkaBolt() {} - - public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) { + + public KafkaBolt<K, V> withTupleToKafkaMapper(TupleToKafkaMapper<K, V> mapper) { this.mapper = mapper; return this; } - public KafkaBolt<K,V> withTopicSelector(KafkaTopicSelector selector) { + public KafkaBolt<K, V> withTopicSelector(KafkaTopicSelector selector) { this.topicSelector = selector; return this; } - public KafkaBolt<K,V> withProducerProperties(Properties producerProperties) { + public KafkaBolt<K, V> withProducerProperties(Properties producerProperties) { this.boltSpecfiedProperties = producerProperties; return this; } @@ -95,13 +86,13 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt { @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { //for backward compatibility. - if(mapper == null) { - this.mapper = new FieldNameBasedTupleToKafkaMapper<K,V>(); + if (mapper == null) { + this.mapper = new FieldNameBasedTupleToKafkaMapper<K, V>(); } //for backward compatibility. - if(topicSelector == null) { - if(topoConf.containsKey(TOPIC)) { + if (topicSelector == null) { + if (topoConf.containsKey(TOPIC)) { this.topicSelector = new DefaultTopicSelector((String) topoConf.get(TOPIC)); } else { throw new IllegalArgumentException("topic should be specified in bolt's configuration"); @@ -121,7 +112,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt { key = mapper.getKeyFromTuple(input); message = mapper.getMessageFromTuple(input); topic = topicSelector.getTopic(input); - if (topic != null ) { + if (topic != null) { Callback callback = null; if (!fireAndForget && async) { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java index 672da8e..7276ef6 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java @@ -1,25 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.bolt.mapper; import org.apache.storm.tuple.Tuple; -public class FieldNameBasedTupleToKafkaMapper<K,V> implements TupleToKafkaMapper<K, V> { +public class FieldNameBasedTupleToKafkaMapper<K, V> implements TupleToKafkaMapper<K, V> { public static final String BOLT_KEY = "key"; public static final String BOLT_MESSAGE = "message"; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java index 3890413..7012e6b 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java @@ -1,32 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka.bolt.mapper; -import org.apache.storm.tuple.Tuple; +package org.apache.storm.kafka.bolt.mapper; import java.io.Serializable; +import org.apache.storm.tuple.Tuple; /** * as the really verbose name suggests this interface mapps a storm tuple to kafka key and message. * @param <K> type of key. * @param <V> type of value. */ -public interface TupleToKafkaMapper<K,V> extends Serializable { +public interface TupleToKafkaMapper<K, V> extends Serializable { K getKeyFromTuple(Tuple tuple); + V getMessageFromTuple(Tuple tuple); } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java index 2aafc78..d1784b0 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.bolt.selector; import org.apache.storm.tuple.Tuple; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java index 7b52403..50c5c1f 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.bolt.selector; import org.apache.storm.tuple.Tuple; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java index a622e8f..d3c304a 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.bolt.selector; import org.apache.storm.tuple.Tuple; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java index cb7fb44..4045df7 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka.bolt.selector; -import org.apache.storm.tuple.Tuple; +package org.apache.storm.kafka.bolt.selector; import java.io.Serializable; +import org.apache.storm.tuple.Tuple; public interface KafkaTopicSelector extends Serializable { String getTopic(Tuple tuple); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java index 76baf62..0e8dba1 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java @@ -1,30 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; +import java.util.List; +import java.util.Map; import org.apache.storm.kafka.KafkaUtils; import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.apache.storm.trident.spout.IPartitionedTridentSpout; -import java.util.List; -import java.util.Map; - -class Coordinator implements IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>>, IOpaquePartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> { +class Coordinator implements IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>>, + IOpaquePartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> { private IBrokerReader reader; private TridentKafkaConfig config; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java index 7a7e32c..575e235 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; public class DefaultCoordinator implements IBatchCoordinator { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java index e420cb3..b26dc7f 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java @@ -1,35 +1,32 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; import com.google.common.base.Objects; +import java.io.Serializable; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import org.apache.storm.kafka.Broker; import org.apache.storm.kafka.Partition; -import java.io.Serializable; -import java.util.*; - public class GlobalPartitionInformation implements Iterable<Partition>, Serializable { - private Map<Integer, Broker> partitionMap; public String topic; - + private Map<Integer, Broker> partitionMap; //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition private Boolean bUseTopicNameForPartitionPathId; @@ -49,16 +46,16 @@ public class GlobalPartitionInformation implements Iterable<Partition>, Serializ partitionMap.put(partitionId, broker); } - public Boolean getbUseTopicNameForPartitionPathId () { + public Boolean getbUseTopicNameForPartitionPathId() { return bUseTopicNameForPartitionPathId; } @Override public String toString() { return "GlobalPartitionInformation{" + - "topic=" + topic + - ", partitionMap=" + partitionMap + - '}'; + "topic=" + topic + + ", partitionMap=" + partitionMap + + '}'; } public Broker getBrokerFor(Integer partitionId) { @@ -87,7 +84,7 @@ public class GlobalPartitionInformation implements Iterable<Partition>, Serializ @Override public Partition next() { Map.Entry<Integer, Broker> next = iterator.next(); - return new Partition(next.getValue(), topic , next.getKey(), bUseTopicNameForPartitionPathId); + return new Partition(next.getValue(), topic, next.getKey(), bUseTopicNameForPartitionPathId); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java index 41369ba..4c6c404 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java index 904d8c9..c5cf8b2 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; import java.util.List; -import java.util.Map; public interface IBrokerReader { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java index 2332205..14324ed 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident;
