http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java index faa4e32..d7ab065 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/NthLastModifiedTimeTracker.java @@ -1,24 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.tools; -import org.apache.storm.utils.Time; import org.apache.commons.collections.buffer.CircularFifoBuffer; +import org.apache.storm.utils.Time; /** * This class tracks the time-since-last-modify of a "thing" in a rolling fashion. @@ -30,41 +25,41 @@ import org.apache.commons.collections.buffer.CircularFifoBuffer; */ public class NthLastModifiedTimeTracker { - private static final int MILLIS_IN_SEC = 1000; + private static final int MILLIS_IN_SEC = 1000; - private final CircularFifoBuffer lastModifiedTimesMillis; + private final CircularFifoBuffer lastModifiedTimesMillis; - public NthLastModifiedTimeTracker(int numTimesToTrack) { - if (numTimesToTrack < 1) { - throw new IllegalArgumentException( - "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")"); + public NthLastModifiedTimeTracker(int numTimesToTrack) { + if (numTimesToTrack < 1) { + throw new IllegalArgumentException( + "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")"); + } + lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack); + initLastModifiedTimesMillis(); } - lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack); - initLastModifiedTimesMillis(); - } - private void initLastModifiedTimesMillis() { - long nowCached = now(); - for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) { - lastModifiedTimesMillis.add(Long.valueOf(nowCached)); + private void initLastModifiedTimesMillis() { + long nowCached = now(); + for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) { + lastModifiedTimesMillis.add(Long.valueOf(nowCached)); + } } - } - private long now() { - return Time.currentTimeMillis(); - } + private long now() { + return Time.currentTimeMillis(); + } - public int secondsSinceOldestModification() { - long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue(); - return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC); - } + public int secondsSinceOldestModification() { + long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue(); + return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC); + } - public void markAsModified() { - updateLastModifiedTime(); - } + public void markAsModified() { + updateLastModifiedTime(); + } - private void updateLastModifiedTime() { - lastModifiedTimesMillis.add(now()); - } + private void updateLastModifiedTime() { + lastModifiedTimesMillis.add(now()); + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java index 85f2b62..ea9e6d6 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankable.java @@ -1,32 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.tools; public interface Rankable extends Comparable<Rankable> { - Object getObject(); + Object getObject(); - long getCount(); + long getCount(); - /** - * Note: We do not defensively copy the object wrapped by the Rankable. It is passed as is. - * - * @return a defensive copy - */ - Rankable copy(); + /** + * Note: We do not defensively copy the object wrapped by the Rankable. It is passed as is. + * + * @return a defensive copy + */ + Rankable copy(); } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java index b1a9dca..0b3808e 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java @@ -1,28 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.tools; -import org.apache.storm.tuple.Tuple; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; - import java.io.Serializable; import java.util.List; +import org.apache.storm.tuple.Tuple; /** * This class wraps an objects and its associated count, including any additional data fields. @@ -31,118 +25,116 @@ import java.util.List; */ public class RankableObjectWithFields implements Rankable, Serializable { - private static final long serialVersionUID = -9102878650001058090L; - private static final String toStringSeparator = "|"; + private static final long serialVersionUID = -9102878650001058090L; + private static final String toStringSeparator = "|"; - private final Object obj; - private final long count; - private final ImmutableList<Object> fields; + private final Object obj; + private final long count; + private final ImmutableList<Object> fields; + + public RankableObjectWithFields(Object obj, long count, Object... otherFields) { + if (obj == null) { + throw new IllegalArgumentException("The object must not be null"); + } + if (count < 0) { + throw new IllegalArgumentException("The count must be >= 0"); + } + this.obj = obj; + this.count = count; + fields = ImmutableList.copyOf(otherFields); - public RankableObjectWithFields(Object obj, long count, Object... otherFields) { - if (obj == null) { - throw new IllegalArgumentException("The object must not be null"); } - if (count < 0) { - throw new IllegalArgumentException("The count must be >= 0"); + + /** + * Construct a new instance based on the provided {@link Tuple}. + * <p/> + * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of + * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be + * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}. + * + * @param tuple + * + * @return new instance based on the provided tuple + */ + public static RankableObjectWithFields from(Tuple tuple) { + List<Object> otherFields = Lists.newArrayList(tuple.getValues()); + Object obj = otherFields.remove(0); + Long count = (Long) otherFields.remove(0); + return new RankableObjectWithFields(obj, count, otherFields.toArray()); } - this.obj = obj; - this.count = count; - fields = ImmutableList.copyOf(otherFields); - - } - - /** - * Construct a new instance based on the provided {@link Tuple}. - * <p/> - * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of - * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be - * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}. - * - * @param tuple - * - * @return new instance based on the provided tuple - */ - public static RankableObjectWithFields from(Tuple tuple) { - List<Object> otherFields = Lists.newArrayList(tuple.getValues()); - Object obj = otherFields.remove(0); - Long count = (Long) otherFields.remove(0); - return new RankableObjectWithFields(obj, count, otherFields.toArray()); - } - - public Object getObject() { - return obj; - } - - public long getCount() { - return count; - } - - /** - * @return an immutable list of any additional data fields of the object (may be empty but will never be null) - */ - public List<Object> getFields() { - return fields; - } - - @Override - public int compareTo(Rankable other) { - long delta = this.getCount() - other.getCount(); - if (delta > 0) { - return 1; + + public Object getObject() { + return obj; } - else if (delta < 0) { - return -1; + + public long getCount() { + return count; } - else { - return 0; + + /** + * @return an immutable list of any additional data fields of the object (may be empty but will never be null) + */ + public List<Object> getFields() { + return fields; + } + + @Override + public int compareTo(Rankable other) { + long delta = this.getCount() - other.getCount(); + if (delta > 0) { + return 1; + } else if (delta < 0) { + return -1; + } else { + return 0; + } } - } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof RankableObjectWithFields)) { + return false; + } + RankableObjectWithFields other = (RankableObjectWithFields) o; + return obj.equals(other.obj) && count == other.count; } - if (!(o instanceof RankableObjectWithFields)) { - return false; + + @Override + public int hashCode() { + int result = 17; + int countHash = (int) (count ^ (count >>> 32)); + result = 31 * result + countHash; + result = 31 * result + obj.hashCode(); + return result; } - RankableObjectWithFields other = (RankableObjectWithFields) o; - return obj.equals(other.obj) && count == other.count; - } - - @Override - public int hashCode() { - int result = 17; - int countHash = (int) (count ^ (count >>> 32)); - result = 31 * result + countHash; - result = 31 * result + obj.hashCode(); - return result; - } - - public String toString() { - StringBuffer buf = new StringBuffer(); - buf.append("["); - buf.append(obj); - buf.append(toStringSeparator); - buf.append(count); - for (Object field : fields) { - buf.append(toStringSeparator); - buf.append(field); + + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append("["); + buf.append(obj); + buf.append(toStringSeparator); + buf.append(count); + for (Object field : fields) { + buf.append(toStringSeparator); + buf.append(field); + } + buf.append("]"); + return buf.toString(); + } + + /** + * Note: We do not defensively copy the wrapped object and any accompanying fields. We do guarantee, however, + * do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields. + * + * @return + */ + @Override + public Rankable copy() { + List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields()); + return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields); } - buf.append("]"); - return buf.toString(); - } - - /** - * Note: We do not defensively copy the wrapped object and any accompanying fields. We do guarantee, however, - * do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields. - * - * @return - */ - @Override - public Rankable copy() { - List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields()); - return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java index 17174b3..eae6ccd 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java @@ -1,156 +1,148 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.tools; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; - import java.io.Serializable; import java.util.Collections; import java.util.List; public class Rankings implements Serializable { - private static final long serialVersionUID = -1549827195410578903L; - private static final int DEFAULT_COUNT = 10; + private static final long serialVersionUID = -1549827195410578903L; + private static final int DEFAULT_COUNT = 10; + + private final int maxSize; + private final List<Rankable> rankedItems = Lists.newArrayList(); + + public Rankings() { + this(DEFAULT_COUNT); + } + + public Rankings(int topN) { + if (topN < 1) { + throw new IllegalArgumentException("topN must be >= 1"); + } + maxSize = topN; + } + + /** + * Copy constructor. + * @param other + */ + public Rankings(Rankings other) { + this(other.maxSize()); + updateWith(other); + } - private final int maxSize; - private final List<Rankable> rankedItems = Lists.newArrayList(); + /** + * @return the maximum possible number (size) of ranked objects this instance can hold + */ + public int maxSize() { + return maxSize; + } - public Rankings() { - this(DEFAULT_COUNT); - } + /** + * @return the number (size) of ranked objects this instance is currently holding + */ + public int size() { + return rankedItems.size(); + } - public Rankings(int topN) { - if (topN < 1) { - throw new IllegalArgumentException("topN must be >= 1"); + /** + * The returned defensive copy is only "somewhat" defensive. We do, for instance, return a defensive copy of the + * enclosing List instance, and we do try to defensively copy any contained Rankable objects, too. However, the + * contract of {@link org.apache.storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within + * a Rankable will be defensively copied, too. + * + * @return a somewhat defensive copy of ranked items + */ + public List<Rankable> getRankings() { + List<Rankable> copy = Lists.newLinkedList(); + for (Rankable r : rankedItems) { + copy.add(r.copy()); + } + return ImmutableList.copyOf(copy); } - maxSize = topN; - } - - /** - * Copy constructor. - * @param other - */ - public Rankings(Rankings other) { - this(other.maxSize()); - updateWith(other); - } - - /** - * @return the maximum possible number (size) of ranked objects this instance can hold - */ - public int maxSize() { - return maxSize; - } - - /** - * @return the number (size) of ranked objects this instance is currently holding - */ - public int size() { - return rankedItems.size(); - } - - /** - * The returned defensive copy is only "somewhat" defensive. We do, for instance, return a defensive copy of the - * enclosing List instance, and we do try to defensively copy any contained Rankable objects, too. However, the - * contract of {@link org.apache.storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within - * a Rankable will be defensively copied, too. - * - * @return a somewhat defensive copy of ranked items - */ - public List<Rankable> getRankings() { - List<Rankable> copy = Lists.newLinkedList(); - for (Rankable r: rankedItems) { - copy.add(r.copy()); + + public void updateWith(Rankings other) { + for (Rankable r : other.getRankings()) { + updateWith(r); + } } - return ImmutableList.copyOf(copy); - } - public void updateWith(Rankings other) { - for (Rankable r : other.getRankings()) { - updateWith(r); + public void updateWith(Rankable r) { + synchronized (rankedItems) { + addOrReplace(r); + rerank(); + shrinkRankingsIfNeeded(); + } } - } - public void updateWith(Rankable r) { - synchronized(rankedItems) { - addOrReplace(r); - rerank(); - shrinkRankingsIfNeeded(); + private void addOrReplace(Rankable r) { + Integer rank = findRankOf(r); + if (rank != null) { + rankedItems.set(rank, r); + } else { + rankedItems.add(r); + } } - } - private void addOrReplace(Rankable r) { - Integer rank = findRankOf(r); - if (rank != null) { - rankedItems.set(rank, r); + private Integer findRankOf(Rankable r) { + Object tag = r.getObject(); + for (int rank = 0; rank < rankedItems.size(); rank++) { + Object cur = rankedItems.get(rank).getObject(); + if (cur.equals(tag)) { + return rank; + } + } + return null; } - else { - rankedItems.add(r); + + private void rerank() { + Collections.sort(rankedItems); + Collections.reverse(rankedItems); } - } - - private Integer findRankOf(Rankable r) { - Object tag = r.getObject(); - for (int rank = 0; rank < rankedItems.size(); rank++) { - Object cur = rankedItems.get(rank).getObject(); - if (cur.equals(tag)) { - return rank; - } + + private void shrinkRankingsIfNeeded() { + if (rankedItems.size() > maxSize) { + rankedItems.remove(maxSize); + } } - return null; - } - private void rerank() { - Collections.sort(rankedItems); - Collections.reverse(rankedItems); - } + /** + * Removes ranking entries that have a count of zero. + */ + public void pruneZeroCounts() { + int i = 0; + while (i < rankedItems.size()) { + if (rankedItems.get(i).getCount() == 0) { + rankedItems.remove(i); + } else { + i++; + } + } + } - private void shrinkRankingsIfNeeded() { - if (rankedItems.size() > maxSize) { - rankedItems.remove(maxSize); + public String toString() { + return rankedItems.toString(); } - } - - /** - * Removes ranking entries that have a count of zero. - */ - public void pruneZeroCounts() { - int i = 0; - while (i < rankedItems.size()) { - if (rankedItems.get(i).getCount() == 0) { - rankedItems.remove(i); - } - else { - i++; - } + + /** + * Creates a (defensive) copy of itself. + */ + public Rankings copy() { + return new Rankings(this); } - } - - public String toString() { - return rankedItems.toString(); - } - - /** - * Creates a (defensive) copy of itself. - */ - public Rankings copy() { - return new Rankings(this); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java index b95a6a9..6aedfc8 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlidingWindowCounter.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.tools; import java.io.Serializable; @@ -67,53 +62,53 @@ import java.util.Map; */ public final class SlidingWindowCounter<T> implements Serializable { - private static final long serialVersionUID = -2645063988768785810L; + private static final long serialVersionUID = -2645063988768785810L; - private SlotBasedCounter<T> objCounter; - private int headSlot; - private int tailSlot; - private int windowLengthInSlots; + private SlotBasedCounter<T> objCounter; + private int headSlot; + private int tailSlot; + private int windowLengthInSlots; - public SlidingWindowCounter(int windowLengthInSlots) { - if (windowLengthInSlots < 2) { - throw new IllegalArgumentException( - "Window length in slots must be at least two (you requested " + windowLengthInSlots + ")"); - } - this.windowLengthInSlots = windowLengthInSlots; - this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots); + public SlidingWindowCounter(int windowLengthInSlots) { + if (windowLengthInSlots < 2) { + throw new IllegalArgumentException( + "Window length in slots must be at least two (you requested " + windowLengthInSlots + ")"); + } + this.windowLengthInSlots = windowLengthInSlots; + this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots); - this.headSlot = 0; - this.tailSlot = slotAfter(headSlot); - } + this.headSlot = 0; + this.tailSlot = slotAfter(headSlot); + } - public void incrementCount(T obj) { - objCounter.incrementCount(obj, headSlot); - } + public void incrementCount(T obj) { + objCounter.incrementCount(obj, headSlot); + } - /** - * Return the current (total) counts of all tracked objects, then advance the window. - * <p/> - * Whenever this method is called, we consider the counts of the current sliding window to be available to and - * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent - * objects within the next "chunk" of the sliding window. - * - * @return The current (total) counts of all tracked objects. - */ - public Map<T, Long> getCountsThenAdvanceWindow() { - Map<T, Long> counts = objCounter.getCounts(); - objCounter.wipeZeros(); - objCounter.wipeSlot(tailSlot); - advanceHead(); - return counts; - } + /** + * Return the current (total) counts of all tracked objects, then advance the window. + * <p/> + * Whenever this method is called, we consider the counts of the current sliding window to be available to and + * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent + * objects within the next "chunk" of the sliding window. + * + * @return The current (total) counts of all tracked objects. + */ + public Map<T, Long> getCountsThenAdvanceWindow() { + Map<T, Long> counts = objCounter.getCounts(); + objCounter.wipeZeros(); + objCounter.wipeSlot(tailSlot); + advanceHead(); + return counts; + } - private void advanceHead() { - headSlot = tailSlot; - tailSlot = slotAfter(tailSlot); - } + private void advanceHead() { + headSlot = tailSlot; + tailSlot = slotAfter(tailSlot); + } - private int slotAfter(int slot) { - return (slot + 1) % windowLengthInSlots; - } + private int slotAfter(int slot) { + return (slot + 1) % windowLengthInSlots; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java index 5bf66a5..51157b0 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/SlotBasedCounter.java @@ -1,28 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.tools; import java.io.Serializable; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.Set; /** * This class provides per-slot counts of the occurrences of objects. @@ -33,84 +26,83 @@ import java.util.Set; */ public final class SlotBasedCounter<T> implements Serializable { - private static final long serialVersionUID = 4858185737378394432L; + private static final long serialVersionUID = 4858185737378394432L; - private final Map<T, long[]> objToCounts = new HashMap<T, long[]>(); - private final int numSlots; + private final Map<T, long[]> objToCounts = new HashMap<T, long[]>(); + private final int numSlots; - public SlotBasedCounter(int numSlots) { - if (numSlots <= 0) { - throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")"); + public SlotBasedCounter(int numSlots) { + if (numSlots <= 0) { + throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")"); + } + this.numSlots = numSlots; } - this.numSlots = numSlots; - } - public void incrementCount(T obj, int slot) { - long[] counts = objToCounts.get(obj); - if (counts == null) { - counts = new long[this.numSlots]; - objToCounts.put(obj, counts); + public void incrementCount(T obj, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + counts = new long[this.numSlots]; + objToCounts.put(obj, counts); + } + counts[slot]++; } - counts[slot]++; - } - public long getCount(T obj, int slot) { - long[] counts = objToCounts.get(obj); - if (counts == null) { - return 0; - } - else { - return counts[slot]; + public long getCount(T obj, int slot) { + long[] counts = objToCounts.get(obj); + if (counts == null) { + return 0; + } else { + return counts[slot]; + } } - } - public Map<T, Long> getCounts() { - Map<T, Long> result = new HashMap<T, Long>(); - for (T obj : objToCounts.keySet()) { - result.put(obj, computeTotalCount(obj)); + public Map<T, Long> getCounts() { + Map<T, Long> result = new HashMap<T, Long>(); + for (T obj : objToCounts.keySet()) { + result.put(obj, computeTotalCount(obj)); + } + return result; } - return result; - } - private long computeTotalCount(T obj) { - long[] curr = objToCounts.get(obj); - long total = 0; - for (long l : curr) { - total += l; + private long computeTotalCount(T obj) { + long[] curr = objToCounts.get(obj); + long total = 0; + for (long l : curr) { + total += l; + } + return total; } - return total; - } - /** - * Reset the slot count of any tracked objects to zero for the given slot. - * - * @param slot - */ - public void wipeSlot(int slot) { - for (T obj : objToCounts.keySet()) { - resetSlotCountToZero(obj, slot); + /** + * Reset the slot count of any tracked objects to zero for the given slot. + * + * @param slot + */ + public void wipeSlot(int slot) { + for (T obj : objToCounts.keySet()) { + resetSlotCountToZero(obj, slot); + } } - } - private void resetSlotCountToZero(T obj, int slot) { - long[] counts = objToCounts.get(obj); - counts[slot] = 0; - } + private void resetSlotCountToZero(T obj, int slot) { + long[] counts = objToCounts.get(obj); + counts[slot] = 0; + } - private boolean shouldBeRemovedFromCounter(T obj) { - return computeTotalCount(obj) == 0; - } + private boolean shouldBeRemovedFromCounter(T obj) { + return computeTotalCount(obj) == 0; + } - /** - * Remove any object from the counter whose total count is zero (to free up memory). - */ - public void wipeZeros() { - for(Iterator<Map.Entry<T, long[]>> it = objToCounts.entrySet().iterator(); it.hasNext(); ) { - Map.Entry<T, long[]> entry = it.next(); - if (shouldBeRemovedFromCounter(entry.getKey())) { - it.remove(); - } + /** + * Remove any object from the counter whose total count is zero (to free up memory). + */ + public void wipeZeros() { + for (Iterator<Map.Entry<T, long[]>> it = objToCounts.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<T, long[]> entry = it.next(); + if (shouldBeRemovedFromCounter(entry.getKey())) { + it.remove(); + } + } } - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java index 5bb23f7..4849f72 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java @@ -18,6 +18,9 @@ package org.apache.storm.starter.trident; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.apache.storm.task.IMetricsContext; import org.apache.storm.topology.FailedException; import org.apache.storm.trident.state.CombinerValueUpdater; @@ -28,10 +31,6 @@ import org.apache.storm.trident.testing.MemoryMapState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.Map; -import java.util.UUID; - public class DebugMemoryMapState<T> extends MemoryMapState<T> { private static final Logger LOG = LoggerFactory.getLogger(DebugMemoryMapState.class); http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java index fa9274d..6938b8a 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java @@ -1,25 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.trident; import java.util.HashMap; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; @@ -47,21 +40,23 @@ public class TridentHBaseWindowingStoreTopology { public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), - new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), - new Values("how many apples can you eat"), new Values("to be or not to be the person")); + new Values("the man went to the store and bought some candy"), + new Values("four score and seven years ago"), + new Values("how many apples can you eat"), new Values("to be or not to be the person")); spout.setCycle(true); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), - new Split(), new Fields("word")) - .window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) - .peek(new Consumer() { - @Override - public void accept(TridentTuple input) { - LOG.info("Received tuple: [{}]", input); - } - }); + new Split(), new Fields("word")) + .window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), + new Fields("count")) + .peek(new Consumer() { + @Override + public void accept(TridentTuple input) { + LOG.info("Received tuple: [{}]", input); + } + }); return topology.build(); } @@ -72,7 +67,8 @@ public class TridentHBaseWindowingStoreTopology { conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 100); // window-state table should already be created with cf:tuples column - HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8")); + HBaseWindowsStoreFactory windowStoreFactory = + new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8")); String topoName = "wordCounterWithWindowing"; if (args.length > 0) { topoName = args[0]; http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java index c5f73ff..067eeff 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.trident; import java.util.ArrayList; import java.util.List; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; @@ -74,25 +68,25 @@ public class TridentMapExample { public static StormTopology buildTopology() { FixedBatchSpout spout = new FixedBatchSpout( - new Fields("word"), 3, new Values("the cow jumped over the moon"), - new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), - new Values("how many apples can you eat"), new Values("to be or not to be the person")); + new Fields("word"), 3, new Values("the cow jumped over the moon"), + new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), + new Values("how many apples can you eat"), new Values("to be or not to be the person")); spout.setCycle(true); TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16) - .flatMap(split) - .map(toUpper, new Fields("uppercased")) - .filter(theFilter) - .peek(new Consumer() { - @Override - public void accept(TridentTuple input) { - System.out.println(input.getString(0)); - } - }) - .groupBy(new Fields("uppercased")) - .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) - .parallelismHint(16); + .flatMap(split) + .map(toUpper, new Fields("uppercased")) + .filter(theFilter) + .peek(new Consumer() { + @Override + public void accept(TridentTuple input) { + System.out.println(input.getString(0)); + } + }) + .groupBy(new Fields("uppercased")) + .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) + .parallelismHint(16); topology.newDRPCStream("words") .flatMap(split, new Fields("word")) http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java index 1cc33a9..bde5baa 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.trident; import java.io.Serializable; import java.util.Comparator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; @@ -55,13 +49,13 @@ public class TridentMinMaxOfDevicesTopology { TridentTopology topology = new TridentTopology(); Stream devicesStream = topology.newStream("devicegen-spout", spout). - each(allFields, new Debug("##### devices")); + each(allFields, new Debug("##### devices")); devicesStream.minBy(deviceID). - each(allFields, new Debug("#### device with min id")); + each(allFields, new Debug("#### device with min id")); devicesStream.maxBy(count). - each(allFields, new Debug("#### device with max count")); + each(allFields, new Debug("#### device with max count")); return topology.build(); } @@ -80,27 +74,27 @@ public class TridentMinMaxOfDevicesTopology { TridentTopology topology = new TridentTopology(); Stream vehiclesStream = topology.newStream("spout1", spout). - each(allFields, new Debug("##### vehicles")); + each(allFields, new Debug("##### vehicles")); Stream slowVehiclesStream = - vehiclesStream - .min(new SpeedComparator()) - .each(vehicleField, new Debug("#### slowest vehicle")); + vehiclesStream + .min(new SpeedComparator()) + .each(vehicleField, new Debug("#### slowest vehicle")); Stream slowDriversStream = - slowVehiclesStream - .project(driverField) - .each(driverField, new Debug("##### slowest driver")); + slowVehiclesStream + .project(driverField) + .each(driverField, new Debug("##### slowest driver")); vehiclesStream - .max(new SpeedComparator()) - .each(vehicleField, new Debug("#### fastest vehicle")) - .project(driverField) - .each(driverField, new Debug("##### fastest driver")); + .max(new SpeedComparator()) + .each(vehicleField, new Debug("#### fastest vehicle")) + .project(driverField) + .each(driverField, new Debug("##### fastest driver")); vehiclesStream - .max(new EfficiencyComparator()). - each(vehicleField, new Debug("#### efficient vehicle")); + .max(new EfficiencyComparator()). + each(vehicleField, new Debug("#### efficient vehicle")); return topology.build(); } @@ -148,9 +142,9 @@ public class TridentMinMaxOfDevicesTopology { @Override public String toString() { return "Driver{" + - "name='" + name + '\'' + - ", id=" + id + - '}'; + "name='" + name + '\'' + + ", id=" + id + + '}'; } } @@ -166,26 +160,27 @@ public class TridentMinMaxOfDevicesTopology { this.efficiency = efficiency; } - @Override - public String toString() { - return "Vehicle{" + - "name='" + name + '\'' + - ", maxSpeed=" + maxSpeed + - ", efficiency=" + efficiency + - '}'; - } - public static List<Object>[] generateVehicles(int count) { List<Object>[] vehicles = new List[count]; for (int i = 0; i < count; i++) { int id = i - 1; vehicles[i] = - (new Values( - new Vehicle("Vehicle-" + id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)), - new Driver("Driver-" + id, id) - )); + (new Values( + new Vehicle("Vehicle-" + id, ThreadLocalRandom.current().nextInt(0, 100), + ThreadLocalRandom.current().nextDouble(1, 5)), + new Driver("Driver-" + id, id) + )); } return vehicles; } + + @Override + public String toString() { + return "Vehicle{" + + "name='" + name + '\'' + + ", maxSpeed=" + maxSpeed + + ", efficiency=" + efficiency + + '}'; + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java index 0d77914..f37442b 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.trident; import java.io.Serializable; import java.util.Comparator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; @@ -57,31 +51,31 @@ public class TridentMinMaxOfVehiclesTopology { TridentTopology topology = new TridentTopology(); Stream vehiclesStream = topology.newStream("spout1", spout). - each(allFields, new Debug("##### vehicles")); + each(allFields, new Debug("##### vehicles")); Stream slowVehiclesStream = - vehiclesStream - .min(new SpeedComparator()) - .each(vehicleField, new Debug("#### slowest vehicle")); + vehiclesStream + .min(new SpeedComparator()) + .each(vehicleField, new Debug("#### slowest vehicle")); Stream slowDriversStream = - slowVehiclesStream - .project(driverField) - .each(driverField, new Debug("##### slowest driver")); + slowVehiclesStream + .project(driverField) + .each(driverField, new Debug("##### slowest driver")); vehiclesStream - .max(new SpeedComparator()) - .each(vehicleField, new Debug("#### fastest vehicle")) - .project(driverField) - .each(driverField, new Debug("##### fastest driver")); + .max(new SpeedComparator()) + .each(vehicleField, new Debug("#### fastest vehicle")) + .project(driverField) + .each(driverField, new Debug("##### fastest driver")); vehiclesStream - .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()). - each(vehicleField, new Debug("#### least efficient vehicle")); + .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()). + each(vehicleField, new Debug("#### least efficient vehicle")); vehiclesStream - .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()). - each(vehicleField, new Debug("#### most efficient vehicle")); + .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()). + each(vehicleField, new Debug("#### most efficient vehicle")); return topology.build(); } @@ -127,9 +121,9 @@ public class TridentMinMaxOfVehiclesTopology { @Override public String toString() { return "Driver{" + - "name='" + name + '\'' + - ", id=" + id + - '}'; + "name='" + name + '\'' + + ", id=" + id + + '}'; } } @@ -145,26 +139,27 @@ public class TridentMinMaxOfVehiclesTopology { this.efficiency = efficiency; } - @Override - public String toString() { - return "Vehicle{" + - "name='" + name + '\'' + - ", maxSpeed=" + maxSpeed + - ", efficiency=" + efficiency + - '}'; - } - public static List<Object>[] generateVehicles(int count) { List<Object>[] vehicles = new List[count]; for (int i = 0; i < count; i++) { int id = i - 1; vehicles[i] = - (new Values( - new Vehicle("Vehicle-" + id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)), - new Driver("Driver-" + id, id) - )); + (new Values( + new Vehicle("Vehicle-" + id, ThreadLocalRandom.current().nextInt(0, 100), + ThreadLocalRandom.current().nextDouble(1, 5)), + new Driver("Driver-" + id, id) + )); } return vehicles; } + + @Override + public String toString() { + return "Vehicle{" + + "name='" + name + '\'' + + ", maxSpeed=" + maxSpeed + + ", efficiency=" + efficiency + + '}'; + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java index 8e5229d..19c0665 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentReach.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.trident; import java.util.ArrayList; @@ -22,7 +17,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; @@ -60,28 +54,41 @@ public class TridentReach { put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); }}; - public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> { - public static class Factory implements StateFactory { - Map _map; + public static StormTopology buildTopology() { + TridentTopology topology = new TridentTopology(); + TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB)); + TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB)); - public Factory(Map map) { - _map = map; - } - @Override - public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - return new StaticSingleKeyMapState(_map); - } + topology.newDRPCStream("reach").stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields( + "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery( + tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"), + new ExpandList(), + new Fields("follower")) + .groupBy(new Fields("follower")).aggregate(new One(), new Fields( + "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach")); + return topology.build(); + } + public static void main(String[] args) throws Exception { + Config conf = new Config(); + StormSubmitter.submitTopology("reach", conf, buildTopology()); + try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) { + Thread.sleep(2000); + + System.out.println("REACH: " + drpc.execute("reach", "aaa")); + System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1")); + System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5")); } + } + public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> { Map _map; public StaticSingleKeyMapState(Map map) { _map = map; } - @Override public List<Object> multiGet(List<List<Object>> keys) { List<Object> ret = new ArrayList(); @@ -92,6 +99,20 @@ public class TridentReach { return ret; } + public static class Factory implements StateFactory { + Map _map; + + public Factory(Map map) { + _map = map; + } + + @Override + public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + return new StaticSingleKeyMapState(_map); + } + + } + } public static class One implements CombinerAggregator<Integer> { @@ -124,30 +145,4 @@ public class TridentReach { } } - - public static StormTopology buildTopology() { - TridentTopology topology = new TridentTopology(); - TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB)); - TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB)); - - - topology.newDRPCStream("reach").stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields( - "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery( - tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"), - new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields( - "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach")); - return topology.build(); - } - - public static void main(String[] args) throws Exception { - Config conf = new Config(); - StormSubmitter.submitTopology("reach", conf, buildTopology()); - try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) { - Thread.sleep(2000); - - System.out.println("REACH: " + drpc.execute("reach", "aaa")); - System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1")); - System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5")); - } - } } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java index 2da29bf..862d09a 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.trident; import org.apache.storm.Config; @@ -45,21 +39,22 @@ public class TridentWindowingInmemoryStoreTopology { public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), - new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), - new Values("how many apples can you eat"), new Values("to be or not to be the person")); + new Values("the man went to the store and bought some candy"), + new Values("four score and seven years ago"), + new Values("how many apples can you eat"), new Values("to be or not to be the person")); spout.setCycle(true); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), - new Split(), new Fields("word")) - .window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) - .peek(new Consumer() { - @Override - public void accept(TridentTuple input) { - LOG.info("Received tuple: [{}]", input); - } - }); + new Split(), new Fields("word")) + .window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) + .peek(new Consumer() { + @Override + public void accept(TridentTuple input) { + LOG.info("Received tuple: [{}]", input); + } + }); return topology.build(); } @@ -71,7 +66,7 @@ public class TridentWindowingInmemoryStoreTopology { if (args.length > 0) { topoName = args[0]; } - + conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100))); } http://git-wip-us.apache.org/repos/asf/storm/blob/81ec15d1/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java index 0f86e1f..bafeba2 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.starter.trident; import org.apache.storm.Config; @@ -36,32 +31,25 @@ import org.apache.storm.utils.DRPCClient; public class TridentWordCount { - public static class Split extends BaseFunction { - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - String sentence = tuple.getString(0); - for (String word : sentence.split(" ")) { - collector.emit(new Values(word)); - } - } - } - public static StormTopology buildTopology() { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), - new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), - new Values("how many apples can you eat"), new Values("to be or not to be the person")); + new Values("the man went to the store and bought some candy"), + new Values("four score and seven years ago"), + new Values("how many apples can you eat"), new Values("to be or not to be the person")); spout.setCycle(true); TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), - new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), - new Count(), new Fields("count")).parallelismHint(16); + new Split(), new Fields("word")) + .groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), + new Count(), new Fields("count")) + .parallelismHint(16); topology.newDRPCStream("words").each(new Fields("args"), new Split(), new Fields("word")) - .groupBy(new Fields("word")) - .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) - .each(new Fields("count"), new FilterNull()) - .project(new Fields("word", "count")); + .groupBy(new Fields("word")) + .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) + .each(new Fields("count"), new FilterNull()) + .project(new Fields("word", "count")); return topology.build(); } @@ -81,4 +69,14 @@ public class TridentWordCount { } } } + + public static class Split extends BaseFunction { + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + String sentence = tuple.getString(0); + for (String word : sentence.split(" ")) { + collector.emit(new Values(word)); + } + } + } }
