http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java index c17c912..c98be42 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java @@ -1,36 +1,30 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; +import java.util.List; +import java.util.Map; import org.apache.storm.kafka.Partition; import org.apache.storm.task.TopologyContext; import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.apache.storm.tuple.Fields; -import java.util.List; -import java.util.Map; - public class OpaqueTridentKafkaSpout - implements IOpaquePartitionedTridentSpout< - List<GlobalPartitionInformation>, - Partition, - Map<String, Object>> { + implements IOpaquePartitionedTridentSpout< + List<GlobalPartitionInformation>, + Partition, + Map<String, Object>> { TridentKafkaConfig _config; @@ -41,17 +35,17 @@ public class OpaqueTridentKafkaSpout @Override public Emitter<List<GlobalPartitionInformation>, - Partition, - Map<String, Object>> getEmitter(Map<String, Object> conf, - TopologyContext context) { + Partition, + Map<String, Object>> getEmitter(Map<String, Object> conf, + TopologyContext context) { return new TridentKafkaEmitter(conf, context, _config, context - .getStormId()).asOpaqueEmitter(); + .getStormId()).asOpaqueEmitter(); } @Override public IOpaquePartitionedTridentSpout.Coordinator getCoordinator( - Map<String, Object> conf, - TopologyContext tc) { + Map<String, Object> conf, + TopologyContext tc) { return new org.apache.storm.kafka.trident.Coordinator(conf, _config); }
http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java index ba27651..3c5cc09 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; import java.util.ArrayList; @@ -24,7 +19,7 @@ import java.util.TreeMap; public class StaticBrokerReader implements IBrokerReader { - private Map<String,GlobalPartitionInformation> brokers = new TreeMap<String,GlobalPartitionInformation>(); + private Map<String, GlobalPartitionInformation> brokers = new TreeMap<String, GlobalPartitionInformation>(); public StaticBrokerReader(String topic, GlobalPartitionInformation partitionInformation) { this.brokers.put(topic, partitionInformation); @@ -37,7 +32,7 @@ public class StaticBrokerReader implements IBrokerReader { } @Override - public List<GlobalPartitionInformation> getAllBrokers () { + public List<GlobalPartitionInformation> getAllBrokers() { List<GlobalPartitionInformation> list = new ArrayList<GlobalPartitionInformation>(); list.addAll(brokers.values()); return list; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java index 1042098..7b1d4dd 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java @@ -1,29 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; +import java.util.Map; import org.apache.storm.kafka.Partition; import org.apache.storm.task.TopologyContext; import org.apache.storm.trident.spout.IPartitionedTridentSpout; import org.apache.storm.tuple.Fields; -import java.util.Map; - public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java index b225e9a..3dac221 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; import org.apache.storm.kafka.BrokerHosts; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java index 3333c2c..cb00579 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java @@ -1,24 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; import com.google.common.collect.ImmutableMap; - +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.Message; +import kafka.message.MessageAndOffset; import org.apache.storm.Config; import org.apache.storm.kafka.DynamicPartitionConnections; import org.apache.storm.kafka.FailedFetchException; @@ -38,17 +41,6 @@ import org.apache.storm.trident.topology.TransactionAttempt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.Message; -import kafka.message.MessageAndOffset; - public class TridentKafkaEmitter { private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class); @@ -74,10 +66,10 @@ public class TridentKafkaEmitter { private Map<String, Object> failFastEmitNewPartitionBatch( - final TransactionAttempt attempt, - TridentCollector collector, - Partition partition, - Map<String, Object> lastMeta) { + final TransactionAttempt attempt, + TridentCollector collector, + Partition partition, + Map<String, Object> lastMeta) { SimpleConsumer consumer = _connections.register(partition); Map<String, Object> ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta, attempt); Long offset = (Long) ret.get("offset"); @@ -86,7 +78,8 @@ public class TridentKafkaEmitter { return ret; } - private Map<String, Object> emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map<String, Object> lastMeta) { + private Map<String, Object> emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, + Map<String, Object> lastMeta) { try { return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta); } catch (FailedFetchException e) { @@ -107,16 +100,16 @@ public class TridentKafkaEmitter { } private Map<String, Object> doEmitNewPartitionBatch(SimpleConsumer consumer, - Partition partition, - TridentCollector collector, - Map<String, Object> lastMeta, - TransactionAttempt attempt) { + Partition partition, + TridentCollector collector, + Map<String, Object> lastMeta, + TransactionAttempt attempt) { LOG.debug("Emitting new partition batch - [transaction = {}], [lastMeta = {}]", attempt, lastMeta); long offset; if (lastMeta != null) { String lastInstanceId = null; Map<String, Object> lastTopoMeta = (Map<String, Object>) - lastMeta.get("topology"); + lastMeta.get("topology"); if (lastTopoMeta != null) { lastInstanceId = (String) lastTopoMeta.get("id"); } @@ -170,7 +163,8 @@ public class TridentKafkaEmitter { /** * re-emit the batch described by the meta data provided */ - private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map<String, Object> meta) { + private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, + Map<String, Object> meta) { LOG.info("re-emitting batch, attempt " + attempt); String instanceId = (String) meta.get("instanceId"); if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) { @@ -241,7 +235,8 @@ public class TridentKafkaEmitter { * for defining the parameters of the next batch. */ @Override - public Map<String, Object> emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) { + public Map<String, Object> emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, + Partition partition, Map<String, Object> map) { return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map); } @@ -270,7 +265,8 @@ public class TridentKafkaEmitter { * Return the metadata that can be used to reconstruct this partition/batch in the future. */ @Override - public Map<String, Object> emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) { + public Map<String, Object> emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, + Partition partition, Map<String, Object> map) { return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map); } @@ -279,7 +275,8 @@ public class TridentKafkaEmitter { * the metadata created when it was first emitted. */ @Override - public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) { + public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, + Map<String, Object> map) { reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map); } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java index eb6737a..71b2cb1 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -1,41 +1,35 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.topology.FailedException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.commons.lang.Validate; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.FailedException; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TridentKafkaState implements State { private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class); @@ -79,9 +73,10 @@ public class TridentKafkaState implements State { for (TridentTuple tuple : tuples) { topic = topicSelector.getTopic(tuple); - if(topic != null) { + if (topic != null) { Future<RecordMetadata> result = producer.send(new ProducerRecord(topic, - mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); + mapper.getKeyFromTuple(tuple), + mapper.getMessageFromTuple(tuple))); futures.add(result); } else { LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); @@ -97,9 +92,9 @@ public class TridentKafkaState implements State { } } - if(exceptions.size() > 0){ - String errorMsg = "Could not retrieve result for messages " + tuples + " from topic = " + topic - + " because of the following exceptions: \n"; + if (exceptions.size() > 0) { + String errorMsg = "Could not retrieve result for messages " + tuples + " from topic = " + topic + + " because of the following exceptions: \n"; for (ExecutionException exception : exceptions) { errorMsg = errorMsg + exception.getMessage() + "\n"; } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java index 0bf21ab..5b66fd8 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java @@ -1,32 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; -import org.apache.storm.task.IMetricsContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.Properties; import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; +import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; - -import java.util.Map; -import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TridentKafkaStateFactory implements StateFactory { @@ -55,8 +49,8 @@ public class TridentKafkaStateFactory implements StateFactory { public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions); TridentKafkaState state = new TridentKafkaState() - .withKafkaTopicSelector(this.topicSelector) - .withTridentTupleToKafkaMapper(this.mapper); + .withKafkaTopicSelector(this.topicSelector) + .withTridentTupleToKafkaMapper(this.mapper); state.prepare(producerProperties); return state; } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java index 7a905ab..1100b66 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java @@ -1,28 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident; +import java.util.List; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.BaseStateUpdater; import org.apache.storm.trident.tuple.TridentTuple; -import java.util.List; - public class TridentKafkaUpdater extends BaseStateUpdater<TridentKafkaState> { @Override public void updateState(TridentKafkaState state, List<TridentTuple> tuples, TridentCollector collector) { http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java index 00758a6..d40256e 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java @@ -1,84 +1,79 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka.trident; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.storm.kafka.DynamicBrokersReader; -import org.apache.storm.kafka.ZkHosts; +package org.apache.storm.kafka.trident; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.storm.kafka.DynamicBrokersReader; +import org.apache.storm.kafka.ZkHosts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ZkBrokerReader implements IBrokerReader { - private static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class); + private static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class); - List<GlobalPartitionInformation> cachedBrokers = new ArrayList<GlobalPartitionInformation>(); - DynamicBrokersReader reader; - long lastRefreshTimeMs; + List<GlobalPartitionInformation> cachedBrokers = new ArrayList<GlobalPartitionInformation>(); + DynamicBrokersReader reader; + long lastRefreshTimeMs; - long refreshMillis; + long refreshMillis; - public ZkBrokerReader(Map<String, Object> conf, String topic, ZkHosts hosts) { - try { - reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic); - cachedBrokers = reader.getBrokerInfo(); - lastRefreshTimeMs = System.currentTimeMillis(); - refreshMillis = hosts.refreshFreqSecs * 1000L; - } catch (java.net.SocketTimeoutException e) { - LOG.warn("Failed to update brokers", e); - } + public ZkBrokerReader(Map<String, Object> conf, String topic, ZkHosts hosts) { + try { + reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic); + cachedBrokers = reader.getBrokerInfo(); + lastRefreshTimeMs = System.currentTimeMillis(); + refreshMillis = hosts.refreshFreqSecs * 1000L; + } catch (java.net.SocketTimeoutException e) { + LOG.warn("Failed to update brokers", e); + } + + } - } + private void refresh() { + long currTime = System.currentTimeMillis(); + if (currTime > lastRefreshTimeMs + refreshMillis) { + try { + LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired"); + cachedBrokers = reader.getBrokerInfo(); + lastRefreshTimeMs = currTime; + } catch (java.net.SocketTimeoutException e) { + LOG.warn("Failed to update brokers", e); + } + } + } - private void refresh() { - long currTime = System.currentTimeMillis(); - if (currTime > lastRefreshTimeMs + refreshMillis) { - try { - LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired"); - cachedBrokers = reader.getBrokerInfo(); - lastRefreshTimeMs = currTime; - } catch (java.net.SocketTimeoutException e) { - LOG.warn("Failed to update brokers", e); - } - } - } - @Override - public GlobalPartitionInformation getBrokerForTopic(String topic) { - refresh(); - for(GlobalPartitionInformation partitionInformation : cachedBrokers) { + @Override + public GlobalPartitionInformation getBrokerForTopic(String topic) { + refresh(); + for (GlobalPartitionInformation partitionInformation : cachedBrokers) { if (partitionInformation.topic.equals(topic)) return partitionInformation; } - return null; - } + return null; + } - @Override - public List<GlobalPartitionInformation> getAllBrokers() { - refresh(); - return cachedBrokers; - } + @Override + public List<GlobalPartitionInformation> getAllBrokers() { + refresh(); + return cachedBrokers; + } - @Override - public void close() { - reader.close(); - } + @Override + public void close() { + reader.close(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java index 2d04971..01e3eca 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident.mapper; import org.apache.storm.trident.tuple.TridentTuple; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java index 28c6c89..4a522d6 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java @@ -1,28 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka.trident.mapper; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.trident.tuple.TridentTuple; +package org.apache.storm.kafka.trident.mapper; import java.io.Serializable; +import org.apache.storm.trident.tuple.TridentTuple; -public interface TridentTupleToKafkaMapper<K,V> extends Serializable { +public interface TridentTupleToKafkaMapper<K, V> extends Serializable { K getKeyFromTuple(TridentTuple tuple); + V getMessageFromTuple(TridentTuple tuple); } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java index 7ae49a3..93b5566 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.trident.selector; import org.apache.storm.trident.tuple.TridentTuple; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java index 012a6c7..6de3921 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka.trident.selector; -import org.apache.storm.trident.tuple.TridentTuple; +package org.apache.storm.kafka.trident.selector; import java.io.Serializable; +import org.apache.storm.trident.tuple.TridentTuple; public interface KafkaTopicSelector extends Serializable { String getTopic(TridentTuple tuple); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java index 46cc60d..a6bb61c 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java @@ -1,40 +1,33 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; -import org.apache.storm.Config; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.curator.utils.ZKPaths; +import org.apache.storm.Config; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.apache.storm.kafka.trident.GlobalPartitionInformation; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; /** * Date: 16/05/2013 @@ -66,7 +59,7 @@ public class DynamicBrokersReaderTest { Map<String, Object> conf2 = new HashMap<>(); conf2.putAll(conf); - conf2.put("kafka.topic.wildcard.match",true); + conf2.put("kafka.topic.wildcard.match", true); wildCardBrokerReader = new DynamicBrokersReader(conf2, connectionString, masterPath, "^test.*$"); zookeeper.start(); @@ -114,8 +107,8 @@ public class DynamicBrokersReaderTest { } - private GlobalPartitionInformation getByTopic(List<GlobalPartitionInformation> partitions, String topic){ - for(GlobalPartitionInformation partitionInformation : partitions) { + private GlobalPartitionInformation getByTopic(List<GlobalPartitionInformation> partitions, String topic) { + for (GlobalPartitionInformation partitionInformation : partitions) { if (partitionInformation.topic.equals(topic)) return partitionInformation; } return null; @@ -242,7 +235,7 @@ public class DynamicBrokersReaderTest { String connectionString = server.getConnectString(); Map<String, Object> conf = new HashMap<>(); conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000); -// conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000); + // conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000); conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4); conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java index 049fce7..5fcee28 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java @@ -1,28 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka; -import org.junit.Test; +package org.apache.storm.kafka; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; import org.junit.After; import org.junit.Before; +import org.junit.Test; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -52,8 +47,8 @@ public class ExponentialBackoffMsgRetryManagerTest { @Test public void testImmediateRetry() throws Exception { - - + + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE); manager.failed(TEST_OFFSET); Long next = manager.nextFailedMessageToRetry(); @@ -124,22 +119,22 @@ public class ExponentialBackoffMsgRetryManagerTest { // so TEST_OFFSET2 should come first Time.advanceTime(initial * 2); - assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); - assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2)); + assertTrue("message " + TEST_OFFSET + "should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); + assertTrue("message " + TEST_OFFSET2 + "should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2)); Long next = manager.nextFailedMessageToRetry(); - assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next); + assertEquals("expect first message to retry is " + TEST_OFFSET2, TEST_OFFSET2, next); Time.advanceTime(initial); // haven't retried yet, so first should still be TEST_OFFSET2 next = manager.nextFailedMessageToRetry(); - assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next); + assertEquals("expect first message to retry is " + TEST_OFFSET2, TEST_OFFSET2, next); manager.retryStarted(next); // now it should be TEST_OFFSET next = manager.nextFailedMessageToRetry(); - assertEquals("expect message to retry is now "+TEST_OFFSET, TEST_OFFSET, next); + assertEquals("expect message to retry is now " + TEST_OFFSET, TEST_OFFSET, next); manager.retryStarted(next); // now none left @@ -230,14 +225,14 @@ public class ExponentialBackoffMsgRetryManagerTest { assertEquals("expect test offset next available for retry", TEST_OFFSET, next); assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); } - + @Test public void testClearInvalidMessages() throws Exception { ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE); manager.failed(TEST_OFFSET); manager.failed(TEST_OFFSET2); manager.failed(TEST_OFFSET3); - + assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2)); assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET3)); @@ -246,7 +241,7 @@ public class ExponentialBackoffMsgRetryManagerTest { Long next = manager.nextFailedMessageToRetry(); assertEquals("expect test offset next available for retry", TEST_OFFSET3, next); - + manager.acked(TEST_OFFSET3); next = manager.nextFailedMessageToRetry(); assertNull("expect no message ready after acked", next); @@ -267,8 +262,8 @@ public class ExponentialBackoffMsgRetryManagerTest { assertFalse(manager.retryFurther(TEST_OFFSET)); } - - private ExponentialBackoffMsgRetryManager buildExponentialBackoffMsgRetryManager(long retryInitialDelayMs, + + private ExponentialBackoffMsgRetryManager buildExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs, int retryLimit) { @@ -276,7 +271,7 @@ public class ExponentialBackoffMsgRetryManagerTest { spoutConfig.retryInitialDelayMs = retryInitialDelayMs; spoutConfig.retryDelayMultiplier = retryDelayMultiplier; spoutConfig.retryDelayMaxMs = retryDelayMaxMs; - spoutConfig.retryLimit = retryLimit; + spoutConfig.retryLimit = retryLimit; ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager(); exponentialBackoffMsgRetryManager.prepare(spoutConfig, null); return exponentialBackoffMsgRetryManager; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java index e38bc1e..ad793ed 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java index 0952764..f31386a 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java @@ -1,22 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; +import java.io.File; +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.TimeUnit; import kafka.admin.AdminUtils; import kafka.api.PartitionMetadata; import kafka.api.TopicMetadata; @@ -34,11 +33,6 @@ import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingServer; import scala.collection.JavaConversions; -import java.io.File; -import java.io.IOException; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - /** * Date: 11/01/2014 * Time: 13:15 @@ -161,6 +155,7 @@ public class KafkaTestBroker { public int getPort() { return port; } + public void shutdown() { if (kafka != null) { kafka.shutdown(); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java index 1bd989f..3c2c0d9 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java @@ -1,38 +1,29 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.kafka; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; +package org.apache.storm.kafka; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.List; import java.util.Properties; - import kafka.api.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; - import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; +import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.utils.Utils; import org.junit.After; import org.junit.Assert; @@ -42,13 +33,14 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.kafka.trident.GlobalPartitionInformation; -import org.apache.storm.spout.SchemeAsMultiScheme; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; -import com.google.common.collect.ImmutableMap; public class KafkaUtilsTest { - private String TEST_TOPIC = "testTopic"; private static final Logger LOG = LoggerFactory.getLogger(KafkaUtilsTest.class); + private String TEST_TOPIC = "testTopic"; private KafkaTestBroker broker; private SimpleConsumer simpleConsumer; private KafkaConfig config; @@ -73,7 +65,8 @@ public class KafkaUtilsTest { @Test(expected = FailedFetchException.class) public void topicDoesNotExist() throws Exception { - KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), 0); + KafkaUtils + .fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), 0); } @Test(expected = FailedFetchException.class) @@ -82,7 +75,9 @@ public class KafkaUtilsTest { broker.shutdown(); SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", port, 100, 1024, "testClient"); try { - KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), OffsetRequest.LatestTime()); + KafkaUtils + .fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), + OffsetRequest.LatestTime()); } finally { simpleConsumer.close(); } @@ -94,7 +89,9 @@ public class KafkaUtilsTest { createTopicAndSendMessage(value); long offset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1; ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer, - new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), offset); + new Partition( + Broker.fromString(broker.getBrokerConnectionString()), + TEST_TOPIC, 0), offset); String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload())); assertThat(message, is(equalTo(value))); } @@ -103,7 +100,7 @@ public class KafkaUtilsTest { public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() throws Exception { config.useStartOffsetTimeIfOffsetOutOfRange = false; KafkaUtils.fetchMessages(config, simpleConsumer, - new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), -99); + new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), -99); } @Test(expected = TopicOffsetOutOfRangeException.class) @@ -112,7 +109,7 @@ public class KafkaUtilsTest { String value = "test"; createTopicAndSendMessage(value); KafkaUtils.fetchMessages(config, simpleConsumer, - new Partition(Broker.fromString(broker.getBrokerConnectionString()), "newTopic", 0), -99); + new Partition(Broker.fromString(broker.getBrokerConnectionString()), "newTopic", 0), -99); } @Test @@ -187,21 +184,21 @@ public class KafkaUtilsTest { assertEquals(value, lists.iterator().next().get(0)); } } - + @Test public void generateTuplesWithMessageAndMetadataScheme() { String value = "value"; Partition mockPartition = Mockito.mock(Partition.class); mockPartition.partition = 0; long offset = 0L; - + MessageMetadataSchemeAsMultiScheme scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme()); - + createTopicAndSendMessage(null, value); ByteBufferMessageSet messageAndOffsets = getLastMessage(); for (MessageAndOffset msg : messageAndOffsets) { Iterable<List<Object>> lists = KafkaUtils.generateTuples(scheme, msg.message(), mockPartition, offset); - List<Object> values = lists.iterator().next(); + List<Object> values = lists.iterator().next(); assertEquals("Message is incorrect", value, values.get(0)); assertEquals("Partition is incorrect", mockPartition.partition, values.get(1)); assertEquals("Offset is incorrect", offset, values.get(2)); @@ -210,12 +207,14 @@ public class KafkaUtilsTest { private ByteBufferMessageSet getLastMessage() { long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1; - return KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), offsetOfLastMessage); + return KafkaUtils + .fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), + offsetOfLastMessage); } private void runGetValueOnlyTuplesTest() { String value = "value"; - + createTopicAndSendMessage(null, value); ByteBufferMessageSet messageAndOffsets = getLastMessage(); for (MessageAndOffset msg : messageAndOffsets) { @@ -264,13 +263,13 @@ public class KafkaUtilsTest { public void assignAllPartitionsToOneTask() { runPartitionToTaskMappingTest(32, 32); } - + public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTask) { GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(numPartitions); List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); partitions.add(globalPartitionInformation); int numTasks = numPartitions / partitionsPerTask; - for (int i = 0 ; i < numTasks ; i++) { + for (int i = 0; i < numTasks; i++) { assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i, i).size()); } } @@ -285,7 +284,7 @@ public class KafkaUtilsTest { assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1, 1).size()); } - @Test (expected = IllegalArgumentException.class ) + @Test(expected = IllegalArgumentException.class) public void assignInvalidTask() { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC); List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java index 805913d..a2824fb 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java @@ -1,22 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.storm.Config; @@ -30,13 +31,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - public class PartitionManagerTest { private static final String TOPIC_NAME = "testTopic"; @@ -128,7 +122,7 @@ public class PartitionManagerTest { PartitionManager partitionManager = partitionManagers.get(0); - for (int i=0; i < 5; i++) { + for (int i = 0; i < 5; i++) { sendMessage("message-" + i); } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java index 7e5ff00..9c749fe 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java @@ -1,29 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; -import org.apache.storm.tuple.Fields; import com.google.common.collect.ImmutableMap; -import org.junit.Test; - import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Collections; +import org.apache.storm.tuple.Fields; +import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -32,6 +26,10 @@ public class StringKeyValueSchemeTest { private StringKeyValueScheme scheme = new StringKeyValueScheme(); + private static ByteBuffer wrapString(String s) { + return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())); + } + @Test public void testDeserialize() throws Exception { assertEquals(Collections.singletonList("test"), scheme.deserialize(wrapString("test"))); @@ -47,16 +45,12 @@ public class StringKeyValueSchemeTest { @Test public void testDeserializeWithNullKeyAndValue() throws Exception { assertEquals(Collections.singletonList("test"), - scheme.deserializeKeyAndValue(null, wrapString("test"))); + scheme.deserializeKeyAndValue(null, wrapString("test"))); } @Test public void testDeserializeWithKeyAndValue() throws Exception { assertEquals(Collections.singletonList(ImmutableMap.of("key", "test")), - scheme.deserializeKeyAndValue(wrapString("key"), wrapString("test"))); - } - - private static ByteBuffer wrapString(String s) { - return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())); + scheme.deserializeKeyAndValue(wrapString("key"), wrapString("test"))); } } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java index 23944ab..ac94e9d 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java @@ -15,26 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.kafka; -import org.junit.Test; +package org.apache.storm.kafka; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import org.junit.Test; import static org.junit.Assert.assertEquals; public class TestStringScheme { - @Test - public void testDeserializeString() { - String s = "foo"; - byte[] bytes = s.getBytes(StandardCharsets.UTF_8); - ByteBuffer direct = ByteBuffer.allocateDirect(bytes.length); - direct.put(bytes); - direct.flip(); - String s1 = StringScheme.deserializeString(ByteBuffer.wrap(bytes)); - String s2 = StringScheme.deserializeString(direct); - assertEquals(s, s1); - assertEquals(s, s2); - } + @Test + public void testDeserializeString() { + String s = "foo"; + byte[] bytes = s.getBytes(StandardCharsets.UTF_8); + ByteBuffer direct = ByteBuffer.allocateDirect(bytes.length); + direct.put(bytes); + direct.flip(); + String s1 = StringScheme.deserializeString(ByteBuffer.wrap(bytes)); + String s2 = StringScheme.deserializeString(direct); + assertEquals(s, s1); + assertEquals(s, s2); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java index c7ba674..921df3c 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java @@ -1,32 +1,28 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; -import org.apache.storm.utils.Utils; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; import kafka.api.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; import org.apache.storm.kafka.trident.GlobalPartitionInformation; - -import java.nio.ByteBuffer; -import java.util.*; +import org.apache.storm.utils.Utils; import static org.junit.Assert.assertEquals; @@ -83,7 +79,9 @@ public class TestUtils { public static boolean verifyMessage(String key, String message, KafkaTestBroker broker, SimpleConsumer simpleConsumer) { long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, TestUtils.TOPIC, 0, OffsetRequest.LatestTime()) - 1; ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(TestUtils.getKafkaConfig(broker), simpleConsumer, - new Partition(Broker.fromString(broker.getBrokerConnectionString()),TestUtils.TOPIC, 0), lastMessageOffset); + new Partition( + Broker.fromString(broker.getBrokerConnectionString()), + TestUtils.TOPIC, 0), lastMessageOffset); MessageAndOffset messageAndOffset = messageAndOffsets.iterator().next(); Message kafkaMessage = messageAndOffset.message(); ByteBuffer messageKeyBuffer = kafkaMessage.key();
