http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java b/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java deleted file mode 100644 index 9a0ecae..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/tools/RankableObjectWithFields.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.tools; - -import backtype.storm.tuple.Tuple; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import java.io.Serializable; -import java.util.List; - -/** - * This class wraps an objects and its associated count, including any additional data fields. - * <p/> - * This class can be used, for instance, to track the number of occurrences of an object in a Storm topology. - */ -public class RankableObjectWithFields implements Rankable, Serializable { - - private static final long serialVersionUID = -9102878650001058090L; - private static final String toStringSeparator = "|"; - - private final Object obj; - private final long count; - private final ImmutableList<Object> fields; - - public RankableObjectWithFields(Object obj, long count, Object... otherFields) { - if (obj == null) { - throw new IllegalArgumentException("The object must not be null"); - } - if (count < 0) { - throw new IllegalArgumentException("The count must be >= 0"); - } - this.obj = obj; - this.count = count; - fields = ImmutableList.copyOf(otherFields); - - } - - /** - * Construct a new instance based on the provided {@link Tuple}. - * <p/> - * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of - * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be - * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}. - * - * @param tuple - * - * @return new instance based on the provided tuple - */ - public static RankableObjectWithFields from(Tuple tuple) { - List<Object> otherFields = Lists.newArrayList(tuple.getValues()); - Object obj = otherFields.remove(0); - Long count = (Long) otherFields.remove(0); - return new RankableObjectWithFields(obj, count, otherFields.toArray()); - } - - public Object getObject() { - return obj; - } - - public long getCount() { - return count; - } - - /** - * @return an immutable list of any additional data fields of the object (may be empty but will never be null) - */ - public List<Object> getFields() { - return fields; - } - - @Override - public int compareTo(Rankable other) { - long delta = this.getCount() - other.getCount(); - if (delta > 0) { - return 1; - } - else if (delta < 0) { - return -1; - } - else { - return 0; - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof RankableObjectWithFields)) { - return false; - } - RankableObjectWithFields other = (RankableObjectWithFields) o; - return obj.equals(other.obj) && count == other.count; - } - - @Override - public int hashCode() { - int result = 17; - int countHash = (int) (count ^ (count >>> 32)); - result = 31 * result + countHash; - result = 31 * result + obj.hashCode(); - return result; - } - - public String toString() { - StringBuffer buf = new StringBuffer(); - buf.append("["); - buf.append(obj); - buf.append(toStringSeparator); - buf.append(count); - for (Object field : fields) { - buf.append(toStringSeparator); - buf.append(field); - } - buf.append("]"); - return buf.toString(); - } - - /** - * Note: We do not defensively copy the wrapped object and any accompanying fields. We do guarantee, however, - * do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields. - * - * @return - */ - @Override - public Rankable copy() { - List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields()); - return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields); - } - -}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java b/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java deleted file mode 100644 index 551ebfb..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/tools/Rankings.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.tools; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import java.io.Serializable; -import java.util.Collections; -import java.util.List; - -public class Rankings implements Serializable { - - private static final long serialVersionUID = -1549827195410578903L; - private static final int DEFAULT_COUNT = 10; - - private final int maxSize; - private final List<Rankable> rankedItems = Lists.newArrayList(); - - public Rankings() { - this(DEFAULT_COUNT); - } - - public Rankings(int topN) { - if (topN < 1) { - throw new IllegalArgumentException("topN must be >= 1"); - } - maxSize = topN; - } - - /** - * Copy constructor. - * @param other - */ - public Rankings(Rankings other) { - this(other.maxSize()); - updateWith(other); - } - - /** - * @return the maximum possible number (size) of ranked objects this instance can hold - */ - public int maxSize() { - return maxSize; - } - - /** - * @return the number (size) of ranked objects this instance is currently holding - */ - public int size() { - return rankedItems.size(); - } - - /** - * The returned defensive copy is only "somewhat" defensive. We do, for instance, return a defensive copy of the - * enclosing List instance, and we do try to defensively copy any contained Rankable objects, too. However, the - * contract of {@link storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within - * a Rankable will be defensively copied, too. - * - * @return a somewhat defensive copy of ranked items - */ - public List<Rankable> getRankings() { - List<Rankable> copy = Lists.newLinkedList(); - for (Rankable r: rankedItems) { - copy.add(r.copy()); - } - return ImmutableList.copyOf(copy); - } - - public void updateWith(Rankings other) { - for (Rankable r : other.getRankings()) { - updateWith(r); - } - } - - public void updateWith(Rankable r) { - synchronized(rankedItems) { - addOrReplace(r); - rerank(); - shrinkRankingsIfNeeded(); - } - } - - private void addOrReplace(Rankable r) { - Integer rank = findRankOf(r); - if (rank != null) { - rankedItems.set(rank, r); - } - else { - rankedItems.add(r); - } - } - - private Integer findRankOf(Rankable r) { - Object tag = r.getObject(); - for (int rank = 0; rank < rankedItems.size(); rank++) { - Object cur = rankedItems.get(rank).getObject(); - if (cur.equals(tag)) { - return rank; - } - } - return null; - } - - private void rerank() { - Collections.sort(rankedItems); - Collections.reverse(rankedItems); - } - - private void shrinkRankingsIfNeeded() { - if (rankedItems.size() > maxSize) { - rankedItems.remove(maxSize); - } - } - - /** - * Removes ranking entries that have a count of zero. - */ - public void pruneZeroCounts() { - int i = 0; - while (i < rankedItems.size()) { - if (rankedItems.get(i).getCount() == 0) { - rankedItems.remove(i); - } - else { - i++; - } - } - } - - public String toString() { - return rankedItems.toString(); - } - - /** - * Creates a (defensive) copy of itself. - */ - public Rankings copy() { - return new Rankings(this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java b/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java deleted file mode 100644 index 1199c40..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/tools/SlidingWindowCounter.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.tools; - -import java.io.Serializable; -import java.util.Map; - -/** - * This class counts objects in a sliding window fashion. - * <p/> - * It is designed 1) to give multiple "producer" threads write access to the counter, i.e. being able to increment - * counts of objects, and 2) to give a single "consumer" thread (e.g. {@link PeriodicSlidingWindowCounter}) read access - * to the counter. Whenever the consumer thread performs a read operation, this class will advance the head slot of the - * sliding window counter. This means that the consumer thread indirectly controls where writes of the producer threads - * will go to. Also, by itself this class will not advance the head slot. - * <p/> - * A note for analyzing data based on a sliding window count: During the initial <code>windowLengthInSlots</code> - * iterations, this sliding window counter will always return object counts that are equal or greater than in the - * previous iteration. This is the effect of the counter "loading up" at the very start of its existence. Conceptually, - * this is the desired behavior. - * <p/> - * To give an example, using a counter with 5 slots which for the sake of this example represent 1 minute of time each: - * <p/> - * <pre> - * {@code - * Sliding window counts of an object X over time - * - * Minute (timeline): - * 1 2 3 4 5 6 7 8 - * - * Observed counts per minute: - * 1 1 1 1 0 0 0 0 - * - * Counts returned by counter: - * 1 2 3 4 4 3 2 1 - * } - * </pre> - * <p/> - * As you can see in this example, for the first <code>windowLengthInSlots</code> (here: the first five minutes) the - * counter will always return counts equal or greater than in the previous iteration (1, 2, 3, 4, 4). This initial load - * effect needs to be accounted for whenever you want to perform analyses such as trending topics; otherwise your - * analysis algorithm might falsely identify the object to be trending as the counter seems to observe continuously - * increasing counts. Also, note that during the initial load phase <em>every object</em> will exhibit increasing - * counts. - * <p/> - * On a high-level, the counter exhibits the following behavior: If you asked the example counter after two minutes, - * "how often did you count the object during the past five minutes?", then it should reply "I have counted it 2 times - * in the past five minutes", implying that it can only account for the last two of those five minutes because the - * counter was not running before that time. - * - * @param <T> The type of those objects we want to count. - */ -public final class SlidingWindowCounter<T> implements Serializable { - - private static final long serialVersionUID = -2645063988768785810L; - - private SlotBasedCounter<T> objCounter; - private int headSlot; - private int tailSlot; - private int windowLengthInSlots; - - public SlidingWindowCounter(int windowLengthInSlots) { - if (windowLengthInSlots < 2) { - throw new IllegalArgumentException( - "Window length in slots must be at least two (you requested " + windowLengthInSlots + ")"); - } - this.windowLengthInSlots = windowLengthInSlots; - this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots); - - this.headSlot = 0; - this.tailSlot = slotAfter(headSlot); - } - - public void incrementCount(T obj) { - objCounter.incrementCount(obj, headSlot); - } - - /** - * Return the current (total) counts of all tracked objects, then advance the window. - * <p/> - * Whenever this method is called, we consider the counts of the current sliding window to be available to and - * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent - * objects within the next "chunk" of the sliding window. - * - * @return The current (total) counts of all tracked objects. - */ - public Map<T, Long> getCountsThenAdvanceWindow() { - Map<T, Long> counts = objCounter.getCounts(); - objCounter.wipeZeros(); - objCounter.wipeSlot(tailSlot); - advanceHead(); - return counts; - } - - private void advanceHead() { - headSlot = tailSlot; - tailSlot = slotAfter(tailSlot); - } - - private int slotAfter(int slot) { - return (slot + 1) % windowLengthInSlots; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java b/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java deleted file mode 100644 index 4b2d472..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/tools/SlotBasedCounter.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.tools; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * This class provides per-slot counts of the occurrences of objects. - * <p/> - * It can be used, for instance, as a building block for implementing sliding window counting of objects. - * - * @param <T> The type of those objects we want to count. - */ -public final class SlotBasedCounter<T> implements Serializable { - - private static final long serialVersionUID = 4858185737378394432L; - - private final Map<T, long[]> objToCounts = new HashMap<T, long[]>(); - private final int numSlots; - - public SlotBasedCounter(int numSlots) { - if (numSlots <= 0) { - throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")"); - } - this.numSlots = numSlots; - } - - public void incrementCount(T obj, int slot) { - long[] counts = objToCounts.get(obj); - if (counts == null) { - counts = new long[this.numSlots]; - objToCounts.put(obj, counts); - } - counts[slot]++; - } - - public long getCount(T obj, int slot) { - long[] counts = objToCounts.get(obj); - if (counts == null) { - return 0; - } - else { - return counts[slot]; - } - } - - public Map<T, Long> getCounts() { - Map<T, Long> result = new HashMap<T, Long>(); - for (T obj : objToCounts.keySet()) { - result.put(obj, computeTotalCount(obj)); - } - return result; - } - - private long computeTotalCount(T obj) { - long[] curr = objToCounts.get(obj); - long total = 0; - for (long l : curr) { - total += l; - } - return total; - } - - /** - * Reset the slot count of any tracked objects to zero for the given slot. - * - * @param slot - */ - public void wipeSlot(int slot) { - for (T obj : objToCounts.keySet()) { - resetSlotCountToZero(obj, slot); - } - } - - private void resetSlotCountToZero(T obj, int slot) { - long[] counts = objToCounts.get(obj); - counts[slot] = 0; - } - - private boolean shouldBeRemovedFromCounter(T obj) { - return computeTotalCount(obj) == 0; - } - - /** - * Remove any object from the counter whose total count is zero (to free up memory). - */ - public void wipeZeros() { - Set<T> objToBeRemoved = new HashSet<T>(); - for (T obj : objToCounts.keySet()) { - if (shouldBeRemovedFromCounter(obj)) { - objToBeRemoved.add(obj); - } - } - for (T obj : objToBeRemoved) { - objToCounts.remove(obj); - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java deleted file mode 100644 index bd8ecba..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * Contains some contributions under the Thrift Software License. - * Please see doc/old-thrift-license.txt in the Thrift distribution for - * details. - */ -package storm.starter.trident; - - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.LocalDRPC; -import backtype.storm.generated.StormTopology; -import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import org.apache.kafka.clients.producer.ProducerConfig; -import storm.kafka.StringScheme; -import storm.kafka.ZkHosts; -import storm.kafka.bolt.KafkaBolt; -import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; -import storm.kafka.bolt.selector.DefaultTopicSelector; -import storm.kafka.trident.TransactionalTridentKafkaSpout; -import storm.kafka.trident.TridentKafkaConfig; -import storm.starter.spout.RandomSentenceSpout; -import storm.trident.Stream; -import storm.trident.TridentState; -import storm.trident.TridentTopology; -import storm.trident.operation.builtin.Count; -import storm.trident.operation.builtin.FilterNull; -import storm.trident.operation.builtin.MapGet; -import storm.trident.testing.MemoryMapState; -import storm.trident.testing.Split; - -import java.util.Properties; - -/** - * A sample word count trident topology using transactional kafka spout that has the following components. - * <ol> - * <li> {@link KafkaBolt} - * that receives random sentences from {@link RandomSentenceSpout} and - * publishes the sentences to a kafka "test" topic. - * </li> - * <li> {@link TransactionalTridentKafkaSpout} - * that consumes sentences from the "test" topic, splits it into words, aggregates - * and stores the word count in a {@link MemoryMapState}. - * </li> - * <li> DRPC query - * that returns the word counts by querying the trident state (MemoryMapState). - * </li> - * </ol> - * <p> - * For more background read the <a href="https://storm.apache.org/documentation/Trident-tutorial.html">trident tutorial</a>, - * <a href="https://storm.apache.org/documentation/Trident-state">trident state</a> and - * <a href="https://github.com/apache/storm/tree/master/external/storm-kafka"> Storm Kafka </a>. - * </p> - */ -public class TridentKafkaWordCount { - - private String zkUrl; - private String brokerUrl; - - TridentKafkaWordCount(String zkUrl, String brokerUrl) { - this.zkUrl = zkUrl; - this.brokerUrl = brokerUrl; - } - - /** - * Creates a transactional kafka spout that consumes any new data published to "test" topic. - * <p/> - * For more info on transactional spouts - * see "Transactional spouts" section in - * <a href="https://storm.apache.org/documentation/Trident-state"> Trident state</a> doc. - * - * @return a transactional trident kafka spout. - */ - private TransactionalTridentKafkaSpout createKafkaSpout() { - ZkHosts hosts = new ZkHosts(zkUrl); - TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test"); - config.scheme = new SchemeAsMultiScheme(new StringScheme()); - - // Consume new data from the topic - config.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); - return new TransactionalTridentKafkaSpout(config); - } - - - private Stream addDRPCStream(TridentTopology tridentTopology, TridentState state, LocalDRPC drpc) { - return tridentTopology.newDRPCStream("words", drpc) - .each(new Fields("args"), new Split(), new Fields("word")) - .groupBy(new Fields("word")) - .stateQuery(state, new Fields("word"), new MapGet(), new Fields("count")) - .each(new Fields("count"), new FilterNull()) - .project(new Fields("word", "count")); - } - - private TridentState addTridentState(TridentTopology tridentTopology) { - return tridentTopology.newStream("spout1", createKafkaSpout()).parallelismHint(1) - .each(new Fields("str"), new Split(), new Fields("word")) - .groupBy(new Fields("word")) - .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) - .parallelismHint(1); - } - - /** - * Creates a trident topology that consumes sentences from the kafka "test" topic using a - * {@link TransactionalTridentKafkaSpout} computes the word count and stores it in a {@link MemoryMapState}. - * A DRPC stream is then created to query the word counts. - * @param drpc - * @return - */ - public StormTopology buildConsumerTopology(LocalDRPC drpc) { - TridentTopology tridentTopology = new TridentTopology(); - addDRPCStream(tridentTopology, addTridentState(tridentTopology), drpc); - return tridentTopology.build(); - } - - /** - * Return the consumer topology config. - * - * @return the topology config - */ - public Config getConsumerConfig() { - Config conf = new Config(); - conf.setMaxSpoutPending(20); - // conf.setDebug(true); - return conf; - } - - /** - * A topology that produces random sentences using {@link RandomSentenceSpout} and - * publishes the sentences using a KafkaBolt to kafka "test" topic. - * - * @return the storm topology - */ - public StormTopology buildProducerTopology(Properties prop) { - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("spout", new RandomSentenceSpout(), 2); - /** - * The output field of the RandomSentenceSpout ("word") is provided as the boltMessageField - * so that this gets written out as the message in the kafka topic. - */ - KafkaBolt bolt = new KafkaBolt().withProducerProperties(prop) - .withTopicSelector(new DefaultTopicSelector("test")) - .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word")); - builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout"); - return builder.createTopology(); - } - - /** - * Returns the storm config for the topology that publishes sentences to kafka "test" topic using a kafka bolt. - * The KAFKA_BROKER_PROPERTIES is needed for the KafkaBolt. - * - * @return the topology config - */ - public Properties getProducerConfig() { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer"); - return props; - } - - /** - * <p> - * To run this topology ensure you have a kafka broker running. - * </p> - * Create a topic test with command line, - * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test - */ - public static void main(String[] args) throws Exception { - - String zkUrl = "localhost:2181"; // the defaults. - String brokerUrl = "localhost:9092"; - - if (args.length > 2 || (args.length == 1 && args[0].matches("^-h|--help$"))) { - System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url]"); - System.out.println(" E.g TridentKafkaWordCount [" + zkUrl + "]" + " [" + brokerUrl + "]"); - System.exit(1); - } else if (args.length == 1) { - zkUrl = args[0]; - } else if (args.length == 2) { - zkUrl = args[0]; - brokerUrl = args[1]; - } - - System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker url: " + brokerUrl); - - TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, brokerUrl); - - LocalDRPC drpc = new LocalDRPC(); - LocalCluster cluster = new LocalCluster(); - - // submit the consumer topology. - cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc)); - - Config conf = new Config(); - conf.setMaxSpoutPending(20); - // submit the producer topology. - cluster.submitTopology("kafkaBolt", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig())); - - // keep querying the word counts for a minute. - for (int i = 0; i < 60; i++) { - System.out.println("DRPC RESULT: " + drpc.execute("words", "the and apple snow jumped")); - Thread.sleep(1000); - } - - cluster.killTopology("kafkaBolt"); - cluster.killTopology("wordCounter"); - cluster.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java deleted file mode 100644 index 2d87c47..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentReach.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.trident; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.LocalDRPC; -import backtype.storm.generated.StormTopology; -import backtype.storm.task.IMetricsContext; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import storm.trident.TridentState; -import storm.trident.TridentTopology; -import storm.trident.operation.BaseFunction; -import storm.trident.operation.CombinerAggregator; -import storm.trident.operation.TridentCollector; -import storm.trident.operation.builtin.MapGet; -import storm.trident.operation.builtin.Sum; -import storm.trident.state.ReadOnlyState; -import storm.trident.state.State; -import storm.trident.state.StateFactory; -import storm.trident.state.map.ReadOnlyMapState; -import storm.trident.tuple.TridentTuple; - -import java.util.*; - -public class TridentReach { - public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{ - put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); - put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); - put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); - }}; - - public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{ - put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); - put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); - put("tim", Arrays.asList("alex")); - put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); - put("adam", Arrays.asList("david", "carissa")); - put("mike", Arrays.asList("john", "bob")); - put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); - }}; - - public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> { - public static class Factory implements StateFactory { - Map _map; - - public Factory(Map map) { - _map = map; - } - - @Override - public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - return new StaticSingleKeyMapState(_map); - } - - } - - Map _map; - - public StaticSingleKeyMapState(Map map) { - _map = map; - } - - - @Override - public List<Object> multiGet(List<List<Object>> keys) { - List<Object> ret = new ArrayList(); - for (List<Object> key : keys) { - Object singleKey = key.get(0); - ret.add(_map.get(singleKey)); - } - return ret; - } - - } - - public static class One implements CombinerAggregator<Integer> { - @Override - public Integer init(TridentTuple tuple) { - return 1; - } - - @Override - public Integer combine(Integer val1, Integer val2) { - return 1; - } - - @Override - public Integer zero() { - return 1; - } - } - - public static class ExpandList extends BaseFunction { - - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - List l = (List) tuple.getValue(0); - if (l != null) { - for (Object o : l) { - collector.emit(new Values(o)); - } - } - } - - } - - public static StormTopology buildTopology(LocalDRPC drpc) { - TridentTopology topology = new TridentTopology(); - TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB)); - TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB)); - - - topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields( - "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery( - tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"), - new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields( - "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach")); - return topology.build(); - } - - public static void main(String[] args) throws Exception { - LocalDRPC drpc = new LocalDRPC(); - - Config conf = new Config(); - LocalCluster cluster = new LocalCluster(); - - cluster.submitTopology("reach", conf, buildTopology(drpc)); - - Thread.sleep(2000); - - System.out.println("REACH: " + drpc.execute("reach", "aaa")); - System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1")); - System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5")); - - - cluster.shutdown(); - drpc.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java deleted file mode 100644 index e4a2d2e..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentWordCount.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.trident; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.LocalDRPC; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import storm.trident.TridentState; -import storm.trident.TridentTopology; -import storm.trident.operation.BaseFunction; -import storm.trident.operation.TridentCollector; -import storm.trident.operation.builtin.Count; -import storm.trident.operation.builtin.FilterNull; -import storm.trident.operation.builtin.MapGet; -import storm.trident.operation.builtin.Sum; -import storm.trident.testing.FixedBatchSpout; -import storm.trident.testing.MemoryMapState; -import storm.trident.tuple.TridentTuple; - - -public class TridentWordCount { - public static class Split extends BaseFunction { - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - String sentence = tuple.getString(0); - for (String word : sentence.split(" ")) { - collector.emit(new Values(word)); - } - } - } - - public static StormTopology buildTopology(LocalDRPC drpc) { - FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), - new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), - new Values("how many apples can you eat"), new Values("to be or not to be the person")); - spout.setCycle(true); - - TridentTopology topology = new TridentTopology(); - TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), - new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), - new Count(), new Fields("count")).parallelismHint(16); - - topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields( - "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), - new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum")); - return topology.build(); - } - - public static void main(String[] args) throws Exception { - Config conf = new Config(); - conf.setMaxSpoutPending(20); - if (args.length == 0) { - LocalDRPC drpc = new LocalDRPC(); - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("wordCounter", conf, buildTopology(drpc)); - for (int i = 0; i < 100; i++) { - System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped")); - Thread.sleep(1000); - } - } - else { - conf.setNumWorkers(3); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null)); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java b/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java deleted file mode 100644 index eb25a86..0000000 --- a/examples/storm-starter/src/jvm/storm/starter/util/StormRunner.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.util; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.generated.StormTopology; - -public final class StormRunner { - - private static final int MILLIS_IN_SEC = 1000; - - private StormRunner() { - } - - public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds) - throws InterruptedException { - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(topologyName, conf, topology); - Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC); - cluster.killTopology(topologyName); - cluster.shutdown(); - } - - public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf) - throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { - StormSubmitter.submitTopology(topologyName, conf, topology); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBoltTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBoltTest.java b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBoltTest.java new file mode 100644 index 0000000..23326c5 --- /dev/null +++ b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/IntermediateRankingsBoltTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.Config; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.MockTupleHelpers; +import com.google.common.collect.Lists; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.Map; + +import static org.fest.assertions.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public class IntermediateRankingsBoltTest { + + private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id"; + private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id"; + private static final Object ANY_OBJECT = new Object(); + private static final int ANY_TOPN = 10; + private static final long ANY_COUNT = 42; + + private Tuple mockRankableTuple(Object obj, long count) { + Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID); + when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, ANY_COUNT)); + return tuple; + } + + @DataProvider + public Object[][] illegalTopN() { + return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } }; + } + + @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN") + public void negativeOrZeroTopNShouldThrowIAE(int topN) { + new IntermediateRankingsBolt(topN); + } + + @DataProvider + public Object[][] illegalEmitFrequency() { + return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } }; + } + + @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency") + public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) { + new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds); + } + + @DataProvider + public Object[][] legalTopN() { + return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } }; + } + + @Test(dataProvider = "legalTopN") + public void positiveTopNShouldBeOk(int topN) { + new IntermediateRankingsBolt(topN); + } + + @DataProvider + public Object[][] legalEmitFrequency() { + return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } }; + } + + @Test(dataProvider = "legalEmitFrequency") + public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) { + new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds); + } + + @Test + public void shouldEmitSomethingIfTickTupleIsReceived() { + // given + Tuple tickTuple = MockTupleHelpers.mockTickTuple(); + BasicOutputCollector collector = mock(BasicOutputCollector.class); + IntermediateRankingsBolt bolt = new IntermediateRankingsBolt(); + + // when + bolt.execute(tickTuple, collector); + + // then + // verifyZeroInteractions(collector); + verify(collector).emit(any(Values.class)); + } + + @Test + public void shouldEmitNothingIfNormalTupleIsReceived() { + // given + Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT); + BasicOutputCollector collector = mock(BasicOutputCollector.class); + IntermediateRankingsBolt bolt = new IntermediateRankingsBolt(); + + // when + bolt.execute(normalTuple, collector); + + // then + verifyZeroInteractions(collector); + } + + @Test + public void shouldDeclareOutputFields() { + // given + OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class); + IntermediateRankingsBolt bolt = new IntermediateRankingsBolt(); + + // when + bolt.declareOutputFields(declarer); + + // then + verify(declarer, times(1)).declare(any(Fields.class)); + } + + @Test + public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() { + // given + IntermediateRankingsBolt bolt = new IntermediateRankingsBolt(); + + // when + Map<String, Object> componentConfig = bolt.getComponentConfiguration(); + + // then + assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + assertThat(emitFrequencyInSeconds).isGreaterThan(0); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/RollingCountBoltTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/RollingCountBoltTest.java b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/RollingCountBoltTest.java new file mode 100644 index 0000000..d068e59 --- /dev/null +++ b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/RollingCountBoltTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.MockTupleHelpers; +import org.testng.annotations.Test; + +import java.util.Map; + +import static org.fest.assertions.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public class RollingCountBoltTest { + + private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id"; + private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id"; + + private Tuple mockNormalTuple(Object obj) { + Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID); + when(tuple.getValue(0)).thenReturn(obj); + return tuple; + } + + @SuppressWarnings("rawtypes") + @Test + public void shouldEmitNothingIfNoObjectHasBeenCountedYetAndTickTupleIsReceived() { + // given + Tuple tickTuple = MockTupleHelpers.mockTickTuple(); + RollingCountBolt bolt = new RollingCountBolt(); + Map conf = mock(Map.class); + TopologyContext context = mock(TopologyContext.class); + OutputCollector collector = mock(OutputCollector.class); + bolt.prepare(conf, context, collector); + + // when + bolt.execute(tickTuple); + + // then + verifyZeroInteractions(collector); + } + + @SuppressWarnings("rawtypes") + @Test + public void shouldEmitSomethingIfAtLeastOneObjectWasCountedAndTickTupleIsReceived() { + // given + Tuple normalTuple = mockNormalTuple(new Object()); + Tuple tickTuple = MockTupleHelpers.mockTickTuple(); + + RollingCountBolt bolt = new RollingCountBolt(); + Map conf = mock(Map.class); + TopologyContext context = mock(TopologyContext.class); + OutputCollector collector = mock(OutputCollector.class); + bolt.prepare(conf, context, collector); + + // when + bolt.execute(normalTuple); + bolt.execute(tickTuple); + + // then + verify(collector).emit(any(Values.class)); + } + + @Test + public void shouldDeclareOutputFields() { + // given + OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class); + RollingCountBolt bolt = new RollingCountBolt(); + + // when + bolt.declareOutputFields(declarer); + + // then + verify(declarer, times(1)).declare(any(Fields.class)); + + } + + @Test + public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() { + // given + RollingCountBolt bolt = new RollingCountBolt(); + + // when + Map<String, Object> componentConfig = bolt.getComponentConfiguration(); + + // then + assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + assertThat(emitFrequencyInSeconds).isGreaterThan(0); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/TotalRankingsBoltTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/TotalRankingsBoltTest.java b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/TotalRankingsBoltTest.java new file mode 100644 index 0000000..c3582d5 --- /dev/null +++ b/examples/storm-starter/test/jvm/org/apache/storm/starter/bolt/TotalRankingsBoltTest.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.bolt; + +import org.apache.storm.Config; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.MockTupleHelpers; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import org.apache.storm.starter.tools.Rankings; + +import java.util.Map; + +import static org.fest.assertions.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public class TotalRankingsBoltTest { + + private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id"; + private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id"; + private static final Object ANY_OBJECT = new Object(); + private static final int ANY_TOPN = 10; + private static final long ANY_COUNT = 42; + + private Tuple mockRankingsTuple(Object obj, long count) { + Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID); + Rankings rankings = mock(Rankings.class); + when(tuple.getValue(0)).thenReturn(rankings); + return tuple; + } + + @DataProvider + public Object[][] illegalTopN() { + return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } }; + } + + @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN") + public void negativeOrZeroTopNShouldThrowIAE(int topN) { + new TotalRankingsBolt(topN); + } + + @DataProvider + public Object[][] illegalEmitFrequency() { + return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } }; + } + + @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency") + public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) { + new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds); + } + + @DataProvider + public Object[][] legalTopN() { + return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } }; + } + + @Test(dataProvider = "legalTopN") + public void positiveTopNShouldBeOk(int topN) { + new TotalRankingsBolt(topN); + } + + @DataProvider + public Object[][] legalEmitFrequency() { + return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } }; + } + + @Test(dataProvider = "legalEmitFrequency") + public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) { + new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds); + } + + @Test + public void shouldEmitSomethingIfTickTupleIsReceived() { + // given + Tuple tickTuple = MockTupleHelpers.mockTickTuple(); + BasicOutputCollector collector = mock(BasicOutputCollector.class); + TotalRankingsBolt bolt = new TotalRankingsBolt(); + + // when + bolt.execute(tickTuple, collector); + + // then + // verifyZeroInteractions(collector); + verify(collector).emit(any(Values.class)); + } + + @Test + public void shouldEmitNothingIfNormalTupleIsReceived() { + // given + Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT); + BasicOutputCollector collector = mock(BasicOutputCollector.class); + TotalRankingsBolt bolt = new TotalRankingsBolt(); + + // when + bolt.execute(normalTuple, collector); + + // then + verifyZeroInteractions(collector); + } + + @Test + public void shouldDeclareOutputFields() { + // given + OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class); + TotalRankingsBolt bolt = new TotalRankingsBolt(); + + // when + bolt.declareOutputFields(declarer); + + // then + verify(declarer, times(1)).declare(any(Fields.class)); + } + + @Test + public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() { + // given + TotalRankingsBolt bolt = new TotalRankingsBolt(); + + // when + Map<String, Object> componentConfig = bolt.getComponentConfiguration(); + + // then + assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + assertThat(emitFrequencyInSeconds).isGreaterThan(0); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java new file mode 100644 index 0000000..a28ea38 --- /dev/null +++ b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTrackerTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.tools; + +import org.apache.storm.utils.Time; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.fest.assertions.api.Assertions.assertThat; + +public class NthLastModifiedTimeTrackerTest { + + private static final int ANY_NUM_TIMES_TO_TRACK = 3; + private static final int MILLIS_IN_SEC = 1000; + + @DataProvider + public Object[][] illegalNumTimesData() { + return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } }; + } + + @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalNumTimesData") + public void negativeOrZeroNumTimesToTrackShouldThrowIAE(int numTimesToTrack) { + new NthLastModifiedTimeTracker(numTimesToTrack); + } + + @DataProvider + public Object[][] legalNumTimesData() { + return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } }; + } + + @Test(dataProvider = "legalNumTimesData") + public void positiveNumTimesToTrackShouldBeOk(int numTimesToTrack) { + new NthLastModifiedTimeTracker(numTimesToTrack); + } + + @DataProvider + public Object[][] whenNotYetMarkedAsModifiedData() { + return new Object[][]{ { 0 }, { 1 }, { 2 }, { 3 }, { 4 }, { 5 }, { 8 }, { 10 } }; + } + + @Test(dataProvider = "whenNotYetMarkedAsModifiedData") + public void shouldReturnCorrectModifiedTimeEvenWhenNotYetMarkedAsModified(int secondsToAdvance) { + // given + Time.startSimulating(); + NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(ANY_NUM_TIMES_TO_TRACK); + + // when + advanceSimulatedTimeBy(secondsToAdvance); + int seconds = tracker.secondsSinceOldestModification(); + + // then + assertThat(seconds).isEqualTo(secondsToAdvance); + + // cleanup + Time.stopSimulating(); + } + + @DataProvider + public Object[][] simulatedTrackerIterations() { + return new Object[][]{ { 1, new int[]{ 0, 1 }, new int[]{ 0, 0 } }, { 1, new int[]{ 0, 2 }, new int[]{ 0, 0 } }, + { 2, new int[]{ 2, 2 }, new int[]{ 2, 2 } }, { 2, new int[]{ 0, 4 }, new int[]{ 0, 4 } }, + { 1, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 0, 0, 0, 0, 0, 0, 0 } }, + { 1, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 0, 0, 0, 0, 0, 0, 0 } }, + { 2, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 1, 1, 1, 1, 1, 1 } }, + { 2, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 2, 2, 2, 2, 2, 2 } }, + { 2, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 2, 3, 4, 5, 6, 7 } }, + { 3, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 2, 2, 2, 2, 2 } }, + { 3, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 5, 7, 9, 11, 13 } }, + { 3, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 4, 4, 4, 4, 4 } }, + { 4, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 3, 3, 3, 3 } }, + { 4, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 9, 12, 15, 18 } }, + { 4, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 6, 6, 6, 6 } }, + { 5, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 4, 4, 4, 4 } }, + { 5, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 10, 14, 18, 22 } }, + { 5, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 8, 8, 8, 8 } }, + { 6, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 4, 5, 5, 5 } }, + { 6, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 10, 15, 20, 25 } }, + { 6, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 8, 10, 10, 10 } }, + { 3, new int[]{ 1, 2, 3 }, new int[]{ 1, 3, 5 } } }; + } + + @Test(dataProvider = "simulatedTrackerIterations") + public void shouldReturnCorrectModifiedTimeWhenMarkedAsModified(int numTimesToTrack, + int[] secondsToAdvancePerIteration, int[] expLastModifiedTimes) { + // given + Time.startSimulating(); + NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(numTimesToTrack); + + int[] modifiedTimes = new int[expLastModifiedTimes.length]; + + // when + int i = 0; + for (int secondsToAdvance : secondsToAdvancePerIteration) { + advanceSimulatedTimeBy(secondsToAdvance); + tracker.markAsModified(); + modifiedTimes[i] = tracker.secondsSinceOldestModification(); + i++; + } + + // then + assertThat(modifiedTimes).isEqualTo(expLastModifiedTimes); + + // cleanup + Time.stopSimulating(); + } + + private void advanceSimulatedTimeBy(int seconds) { + Time.advanceTime(seconds * MILLIS_IN_SEC); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankableObjectWithFieldsTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankableObjectWithFieldsTest.java b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankableObjectWithFieldsTest.java new file mode 100644 index 0000000..9837569 --- /dev/null +++ b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankableObjectWithFieldsTest.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.tools; + +import org.apache.storm.tuple.Tuple; +import com.google.common.collect.Lists; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.fest.assertions.api.Assertions.assertThat; +import static org.mockito.Mockito.*; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class RankableObjectWithFieldsTest { + + private static final Object ANY_OBJECT = new Object(); + private static final long ANY_COUNT = 271; + private static final String ANY_FIELD = "someAdditionalField"; + private static final int GREATER_THAN = 1; + private static final int EQUAL_TO = 0; + private static final int SMALLER_THAN = -1; + + @Test(expectedExceptions = IllegalArgumentException.class) + public void constructorWithNullObjectAndNoFieldsShouldThrowIAE() { + new RankableObjectWithFields(null, ANY_COUNT); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void constructorWithNullObjectAndFieldsShouldThrowIAE() { + Object someAdditionalField = new Object(); + new RankableObjectWithFields(null, ANY_COUNT, someAdditionalField); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void constructorWithNegativeCountAndNoFieldsShouldThrowIAE() { + new RankableObjectWithFields(ANY_OBJECT, -1); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void constructorWithNegativeCountAndFieldsShouldThrowIAE() { + Object someAdditionalField = new Object(); + new RankableObjectWithFields(ANY_OBJECT, -1, someAdditionalField); + } + + @Test + public void shouldBeEqualToItself() { + RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT); + assertThat(r).isEqualTo(r); + } + + @DataProvider + public Object[][] otherClassesData() { + return new Object[][]{ { new String("foo") }, { new Object() }, { Integer.valueOf(4) }, { Lists.newArrayList(7, 8, + 9) } }; + } + + @Test(dataProvider = "otherClassesData") + public void shouldNotBeEqualToInstancesOfOtherClasses(Object notARankable) { + RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT); + assertFalse(r.equals(notARankable), r + " is equal to " + notARankable + " but it should not be"); + } + + @DataProvider + public Object[][] falseDuplicatesData() { + return new Object[][]{ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1) }, + { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("Foo", 1) }, + { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("FOO", 1) }, + { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("bar", 1) }, + { new RankableObjectWithFields("", 0), new RankableObjectWithFields("", 1) }, { new RankableObjectWithFields("", + 1), new RankableObjectWithFields("bar", 1) } }; + } + + @Test(dataProvider = "falseDuplicatesData") + public void shouldNotBeEqualToFalseDuplicates(RankableObjectWithFields r, RankableObjectWithFields falseDuplicate) { + assertFalse(r.equals(falseDuplicate), r + " is equal to " + falseDuplicate + " but it should not be"); + } + + @Test(dataProvider = "falseDuplicatesData") + public void shouldHaveDifferentHashCodeThanFalseDuplicates(RankableObjectWithFields r, + RankableObjectWithFields falseDuplicate) { + assertThat(r.hashCode()).isNotEqualTo(falseDuplicate.hashCode()); + } + + @DataProvider + public Object[][] trueDuplicatesData() { + return new Object[][]{ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0) }, + { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0, "someOtherField") }, + { new RankableObjectWithFields("foo", 0, "someField"), new RankableObjectWithFields("foo", 0, + "someOtherField") } }; + } + + @Test(dataProvider = "trueDuplicatesData") + public void shouldBeEqualToTrueDuplicates(RankableObjectWithFields r, RankableObjectWithFields trueDuplicate) { + assertTrue(r.equals(trueDuplicate), r + " is not equal to " + trueDuplicate + " but it should be"); + } + + @Test(dataProvider = "trueDuplicatesData") + public void shouldHaveSameHashCodeAsTrueDuplicates(RankableObjectWithFields r, + RankableObjectWithFields trueDuplicate) { + assertThat(r.hashCode()).isEqualTo(trueDuplicate.hashCode()); + } + + @DataProvider + public Object[][] compareToData() { + return new Object[][]{ { new RankableObjectWithFields("foo", 1000), new RankableObjectWithFields("foo", 0), + GREATER_THAN }, { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("foo", 0), + GREATER_THAN }, { new RankableObjectWithFields("foo", 1000), new RankableObjectWithFields("bar", 0), + GREATER_THAN }, { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("bar", 0), + GREATER_THAN }, { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0), EQUAL_TO }, + { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 0), EQUAL_TO }, + { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1000), SMALLER_THAN }, + { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1), SMALLER_THAN }, + { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 1), SMALLER_THAN }, + { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 1000), SMALLER_THAN }, }; + } + + @Test(dataProvider = "compareToData") + public void verifyCompareTo(RankableObjectWithFields first, RankableObjectWithFields second, int expCompareToValue) { + assertThat(first.compareTo(second)).isEqualTo(expCompareToValue); + } + + @DataProvider + public Object[][] toStringData() { + return new Object[][]{ { new String("foo"), 0L }, { new String("BAR"), 8L } }; + } + + @Test(dataProvider = "toStringData") + public void toStringShouldContainStringRepresentationsOfObjectAndCount(Object obj, long count) { + // given + RankableObjectWithFields r = new RankableObjectWithFields(obj, count); + + // when + String strRepresentation = r.toString(); + + // then + assertThat(strRepresentation).contains(obj.toString()).contains("" + count); + } + + @Test + public void shouldReturnTheObject() { + // given + RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD); + + // when + Object obj = r.getObject(); + + // then + assertThat(obj).isEqualTo(ANY_OBJECT); + } + + @Test + public void shouldReturnTheCount() { + // given + RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD); + + // when + long count = r.getCount(); + + // then + assertThat(count).isEqualTo(ANY_COUNT); + } + + @DataProvider + public Object[][] fieldsData() { + return new Object[][]{ { ANY_OBJECT, ANY_COUNT, new Object[]{ ANY_FIELD } }, + { "quux", 42L, new Object[]{ "one", "two", "three" } } }; + } + + @Test(dataProvider = "fieldsData") + public void shouldReturnTheFields(Object obj, long count, Object[] fields) { + // given + RankableObjectWithFields r = new RankableObjectWithFields(obj, count, fields); + + // when + List<Object> actualFields = r.getFields(); + + // then + assertThat(actualFields).isEqualTo(Lists.newArrayList(fields)); + } + + @Test(expectedExceptions = UnsupportedOperationException.class) + public void fieldsShouldBeImmutable() { + // given + RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD); + + // when + List<Object> fields = r.getFields(); + // try to modify the list, which should fail + fields.remove(0); + + // then (exception) + } + + @Test + public void shouldCreateRankableObjectFromTuple() { + // given + Tuple tuple = mock(Tuple.class); + List<Object> tupleValues = Lists.newArrayList(ANY_OBJECT, ANY_COUNT, ANY_FIELD); + when(tuple.getValues()).thenReturn(tupleValues); + + // when + RankableObjectWithFields r = RankableObjectWithFields.from(tuple); + + // then + assertThat(r.getObject()).isEqualTo(ANY_OBJECT); + assertThat(r.getCount()).isEqualTo(ANY_COUNT); + List<Object> fields = new ArrayList<Object>(); + fields.add(ANY_FIELD); + assertThat(r.getFields()).isEqualTo(fields); + + } + + @DataProvider + public Object[][] copyData() { + return new Object[][]{ { new RankableObjectWithFields("foo", 0) }, { new RankableObjectWithFields("foo", 3, + "someOtherField") }, { new RankableObjectWithFields("foo", 0, "someField") } }; + } + + // TODO: What would be a good test to ensure that RankableObjectWithFields is at least somewhat defensively copied? + // The contract of Rankable#copy() returns a Rankable value, not a RankableObjectWithFields. + @Test(dataProvider = "copyData") + public void copyShouldReturnCopy(RankableObjectWithFields original) { + // given + + // when + Rankable copy = original.copy(); + + // then + assertThat(copy.getObject()).isEqualTo(original.getObject()); + assertThat(copy.getCount()).isEqualTo(original.getCount()); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java new file mode 100644 index 0000000..245c552 --- /dev/null +++ b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.tools; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.jmock.lib.concurrent.Blitzer; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.List; + +import static org.fest.assertions.api.Assertions.assertThat; + +public class RankingsTest { + + private static final int ANY_TOPN = 42; + private static final Rankable ANY_RANKABLE = new RankableObjectWithFields("someObject", ANY_TOPN); + private static final Rankable ZERO = new RankableObjectWithFields("ZERO_COUNT", 0); + private static final Rankable A = new RankableObjectWithFields("A", 1); + private static final Rankable B = new RankableObjectWithFields("B", 2); + private static final Rankable C = new RankableObjectWithFields("C", 3); + private static final Rankable D = new RankableObjectWithFields("D", 4); + private static final Rankable E = new RankableObjectWithFields("E", 5); + private static final Rankable F = new RankableObjectWithFields("F", 6); + private static final Rankable G = new RankableObjectWithFields("G", 7); + private static final Rankable H = new RankableObjectWithFields("H", 8); + + @DataProvider + public Object[][] illegalTopNData() { + return new Object[][]{ { 0 }, { -1 }, { -2 }, { -10 } }; + } + + @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopNData") + public void constructorWithNegativeOrZeroTopNShouldThrowIAE(int topN) { + new Rankings(topN); + } + + @DataProvider + public Object[][] copyRankingsData() { + return new Object[][]{ { 5, Lists.newArrayList(A, B, C) }, { 2, Lists.newArrayList(A, B, C, D) }, + { 1, Lists.newArrayList() }, { 1, Lists.newArrayList(A) }, { 1, Lists.newArrayList(A, B) } }; + } + + @Test(dataProvider = "copyRankingsData") + public void copyConstructorShouldReturnCopy(int topN, List<Rankable> rankables) { + // given + Rankings rankings = new Rankings(topN); + for (Rankable r : rankables) { + rankings.updateWith(r); + } + + // when + Rankings copy = new Rankings(rankings); + + // then + assertThat(copy.maxSize()).isEqualTo(rankings.maxSize()); + assertThat(copy.getRankings()).isEqualTo(rankings.getRankings()); + } + + @DataProvider + public Object[][] defensiveCopyRankingsData() { + return new Object[][]{ { 5, Lists.newArrayList(A, B, C), Lists.newArrayList(D) }, { 2, Lists.newArrayList(A, B, C, + D), Lists.newArrayList(E, F) }, { 1, Lists.newArrayList(), Lists.newArrayList(A) }, { 1, Lists.newArrayList(A), + Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO), Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO), + Lists.newArrayList() } }; + } + + @Test(dataProvider = "defensiveCopyRankingsData") + public void copyConstructorShouldReturnDefensiveCopy(int topN, List<Rankable> rankables, List<Rankable> changes) { + // given + Rankings original = new Rankings(topN); + for (Rankable r : rankables) { + original.updateWith(r); + } + int expSize = original.size(); + List<Rankable> expRankings = original.getRankings(); + + // when + Rankings copy = new Rankings(original); + for (Rankable r : changes) { + copy.updateWith(r); + } + + // then + assertThat(original.size()).isEqualTo(expSize); + assertThat(original.getRankings()).isEqualTo(expRankings); + } + + @DataProvider + public Object[][] legalTopNData() { + return new Object[][]{ { 1 }, { 2 }, { 1000 }, { 1000000 } }; + } + + @Test(dataProvider = "legalTopNData") + public void constructorWithPositiveTopNShouldBeOk(int topN) { + // given/when + Rankings rankings = new Rankings(topN); + + // then + assertThat(rankings.maxSize()).isEqualTo(topN); + } + + @Test + public void shouldHaveDefaultConstructor() { + new Rankings(); + } + + @Test + public void defaultConstructorShouldSetPositiveTopN() { + // given/when + Rankings rankings = new Rankings(); + + // then + assertThat(rankings.maxSize()).isGreaterThan(0); + } + + @DataProvider + public Object[][] rankingsGrowData() { + return new Object[][]{ { 2, Lists.newArrayList(new RankableObjectWithFields("A", 1), new RankableObjectWithFields( + "B", 2), new RankableObjectWithFields("C", 3)) }, { 2, Lists.newArrayList(new RankableObjectWithFields("A", 1), + new RankableObjectWithFields("B", 2), new RankableObjectWithFields("C", 3), new RankableObjectWithFields("D", + 4)) } }; + } + + @Test(dataProvider = "rankingsGrowData") + public void sizeOfRankingsShouldNotGrowBeyondTopN(int topN, List<Rankable> rankables) { + // sanity check of the provided test data + assertThat(rankables.size()).overridingErrorMessage( + "The supplied test data is not correct: the number of rankables <%d> should be greater than <%d>", + rankables.size(), topN).isGreaterThan(topN); + + // given + Rankings rankings = new Rankings(topN); + + // when + for (Rankable r : rankables) { + rankings.updateWith(r); + } + + // then + assertThat(rankings.size()).isLessThanOrEqualTo(rankings.maxSize()); + } + + @DataProvider + public Object[][] simulatedRankingsData() { + return new Object[][]{ { Lists.newArrayList(A), Lists.newArrayList(A) }, { Lists.newArrayList(B, D, A, C), + Lists.newArrayList(D, C, B, A) }, { Lists.newArrayList(B, F, A, C, D, E), Lists.newArrayList(F, E, D, C, B, + A) }, { Lists.newArrayList(G, B, F, A, C, D, E, H), Lists.newArrayList(H, G, F, E, D, C, B, A) } }; + } + + @Test(dataProvider = "simulatedRankingsData") + public void shouldCorrectlyRankWhenUpdatedWithRankables(List<Rankable> unsorted, List<Rankable> expSorted) { + // given + Rankings rankings = new Rankings(unsorted.size()); + + // when + for (Rankable r : unsorted) { + rankings.updateWith(r); + } + + // then + assertThat(rankings.getRankings()).isEqualTo(expSorted); + } + + @Test(dataProvider = "simulatedRankingsData") + public void shouldCorrectlyRankWhenEmptyAndUpdatedWithOtherRankings(List<Rankable> unsorted, + List<Rankable> expSorted) { + // given + Rankings rankings = new Rankings(unsorted.size()); + Rankings otherRankings = new Rankings(rankings.maxSize()); + for (Rankable r : unsorted) { + otherRankings.updateWith(r); + } + + // when + rankings.updateWith(otherRankings); + + // then + assertThat(rankings.getRankings()).isEqualTo(expSorted); + } + + @Test(dataProvider = "simulatedRankingsData") + public void shouldCorrectlyRankWhenUpdatedWithEmptyOtherRankings(List<Rankable> unsorted, List<Rankable> expSorted) { + // given + Rankings rankings = new Rankings(unsorted.size()); + for (Rankable r : unsorted) { + rankings.updateWith(r); + } + Rankings emptyRankings = new Rankings(ANY_TOPN); + + // when + rankings.updateWith(emptyRankings); + + // then + assertThat(rankings.getRankings()).isEqualTo(expSorted); + } + + @DataProvider + public Object[][] simulatedRankingsAndOtherRankingsData() { + return new Object[][]{ { Lists.newArrayList(A), Lists.newArrayList(A), Lists.newArrayList(A) }, + { Lists.newArrayList(A, C), Lists.newArrayList(B, D), Lists.newArrayList(D, C, B, A) }, { Lists.newArrayList(B, + F, A), Lists.newArrayList(C, D, E), Lists.newArrayList(F, E, D, C, B, A) }, { Lists.newArrayList(G, B, F, A, C), + Lists.newArrayList(D, E, H), Lists.newArrayList(H, G, F, E, D, C, B, A) } }; + } + + @Test(dataProvider = "simulatedRankingsAndOtherRankingsData") + public void shouldCorrectlyRankWhenNotEmptyAndUpdatedWithOtherRankings(List<Rankable> unsorted, + List<Rankable> unsortedForOtherRankings, List<Rankable> expSorted) { + // given + Rankings rankings = new Rankings(expSorted.size()); + for (Rankable r : unsorted) { + rankings.updateWith(r); + } + Rankings otherRankings = new Rankings(unsortedForOtherRankings.size()); + for (Rankable r : unsortedForOtherRankings) { + otherRankings.updateWith(r); + } + + // when + rankings.updateWith(otherRankings); + + // then + assertThat(rankings.getRankings()).isEqualTo(expSorted); + } + + @DataProvider + public Object[][] duplicatesData() { + Rankable A1 = new RankableObjectWithFields("A", 1); + Rankable A2 = new RankableObjectWithFields("A", 2); + Rankable A3 = new RankableObjectWithFields("A", 3); + return new Object[][]{ { Lists.newArrayList(ANY_RANKABLE, ANY_RANKABLE, ANY_RANKABLE) }, { Lists.newArrayList(A1, + A2, A3) }, }; + } + + @Test(dataProvider = "duplicatesData") + public void shouldNotRankDuplicateObjectsMoreThanOnce(List<Rankable> duplicates) { + // given + Rankings rankings = new Rankings(duplicates.size()); + + // when + for (Rankable r : duplicates) { + rankings.updateWith(r); + } + + // then + assertThat(rankings.size()).isEqualTo(1); + } + + @DataProvider + public Object[][] removeZeroRankingsData() { + return new Object[][]{ { Lists.newArrayList(A, ZERO), Lists.newArrayList(A) }, { Lists.newArrayList(A), + Lists.newArrayList(A) }, { Lists.newArrayList(ZERO, A), Lists.newArrayList(A) }, { Lists.newArrayList(ZERO), + Lists.newArrayList() }, { Lists.newArrayList(ZERO, new RankableObjectWithFields("ZERO2", 0)), + Lists.newArrayList() }, { Lists.newArrayList(B, ZERO, new RankableObjectWithFields("ZERO2", 0), D, + new RankableObjectWithFields("ZERO3", 0), new RankableObjectWithFields("ZERO4", 0), C), Lists.newArrayList(D, C, + B) }, { Lists.newArrayList(A, ZERO, B), Lists.newArrayList(B, A) } }; + } + + @Test(dataProvider = "removeZeroRankingsData") + public void shouldRemoveZeroCounts(List<Rankable> unsorted, List<Rankable> expSorted) { + // given + Rankings rankings = new Rankings(unsorted.size()); + for (Rankable r : unsorted) { + rankings.updateWith(r); + } + + // when + rankings.pruneZeroCounts(); + + // then + assertThat(rankings.getRankings()).isEqualTo(expSorted); + } + + @Test + public void updatingWithNewRankablesShouldBeThreadSafe() throws InterruptedException { + // given + final List<Rankable> entries = ImmutableList.of(A, B, C, D); + final Rankings rankings = new Rankings(entries.size()); + + // We are capturing exceptions thrown in Blitzer's child threads into this data structure so that we can properly + // pass/fail this test. The reason is that Blitzer doesn't report exceptions, which is a known bug in Blitzer + // (JMOCK-263). See https://github.com/jmock-developers/jmock-library/issues/22 for more information. + final List<Exception> exceptions = Lists.newArrayList(); + Blitzer blitzer = new Blitzer(1000); + + // when + blitzer.blitz(new Runnable() { + public void run() { + for (Rankable r : entries) { + try { + rankings.updateWith(r); + } + catch (RuntimeException e) { + synchronized(exceptions) { + exceptions.add(e); + } + } + } + } + }); + blitzer.shutdown(); + + // then + // + if (!exceptions.isEmpty()) { + for (Exception e : exceptions) { + System.err.println(Throwables.getStackTraceAsString(e)); + } + } + assertThat(exceptions).isEmpty(); + } + + @Test(dataProvider = "copyRankingsData") + public void copyShouldReturnCopy(int topN, List<Rankable> rankables) { + // given + Rankings rankings = new Rankings(topN); + for (Rankable r : rankables) { + rankings.updateWith(r); + } + + // when + Rankings copy = rankings.copy(); + + // then + assertThat(copy.maxSize()).isEqualTo(rankings.maxSize()); + assertThat(copy.getRankings()).isEqualTo(rankings.getRankings()); + } + + @Test(dataProvider = "defensiveCopyRankingsData") + public void copyShouldReturnDefensiveCopy(int topN, List<Rankable> rankables, List<Rankable> changes) { + // given + Rankings original = new Rankings(topN); + for (Rankable r : rankables) { + original.updateWith(r); + } + int expSize = original.size(); + List<Rankable> expRankings = original.getRankings(); + + // when + Rankings copy = original.copy(); + for (Rankable r : changes) { + copy.updateWith(r); + } + copy.pruneZeroCounts(); + + // then + assertThat(original.size()).isEqualTo(expSize); + assertThat(original.getRankings()).isEqualTo(expRankings); + } + +}
