Repository: storm Updated Branches: refs/heads/master 82bea75c9 -> 46999909a
min/max operators implementation in Trident streams API. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5295a909 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5295a909 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5295a909 Branch: refs/heads/master Commit: 5295a909b1fc8c136a220a0e636c0d70c036e5c5 Parents: 31b57e8 Author: Satish Duggana <[email protected]> Authored: Fri Jan 29 12:39:26 2016 +0530 Committer: Satish Duggana <[email protected]> Committed: Mon Feb 8 11:14:41 2016 +0530 ---------------------------------------------------------------------- .../TridentMinMaxOperationsTopology.java | 208 +++++++++++++++++++ .../jvm/org/apache/storm/trident/Stream.java | 117 +++++++++-- .../operation/builtin/ComparisonAggregator.java | 72 +++++++ .../storm/trident/operation/builtin/Max.java | 43 ++++ .../operation/builtin/MaxWithComparator.java | 44 ++++ .../storm/trident/operation/builtin/Min.java | 44 ++++ .../operation/builtin/MinWithComparator.java | 44 ++++ .../trident/testing/NumberGeneratorSpout.java | 92 ++++++++ 8 files changed, 651 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java new file mode 100644 index 0000000..dedaaff --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.testing.NumberGeneratorSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * This class contains different usages of minBy, maxBy, min and max operations on trident streams. + * + */ +public class TridentMinMaxOperationsTopology { + public static class Split extends BaseFunction { + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + String sentence = tuple.getString(0); + for (String word : sentence.split(" ")) { + collector.emit(new Values(word)); + } + } + } + + public static StormTopology buildIdsTopology() { + NumberGeneratorSpout spout = new NumberGeneratorSpout(new Fields("id"), 10, 1000); + + TridentTopology topology = new TridentTopology(); + Stream wordsStream = topology.newStream("numgen-spout", spout). + each(new Fields("id"), new Debug("##### ids")); + + wordsStream.minBy("id"). + each(new Fields("id"), new Debug("#### min-id")); + + wordsStream.maxBy("id"). + each(new Fields("id"), new Debug("#### max-id")); + + return topology.build(); + } + + public static StormTopology buildWordsTopology() { + FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), + new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), + new Values("how many apples can you eat"), new Values("to be or not to be the person")); + spout.setCycle(true); + + TridentTopology topology = new TridentTopology(); + Stream wordsStream = topology.newStream("spout1", spout).parallelismHint(16). + each(new Fields("sentence"), new Split(), new Fields("word")). + each(new Fields("word"), new Debug("##### words")); + + wordsStream.minBy("word"). + each(new Fields("word"), new Debug("#### lowest word")); + + wordsStream.maxBy("word"). + each(new Fields("word"), new Debug("#### highest word")); + + return topology.build(); + } + + public static StormTopology buildVehiclesTopology() { + + FixedBatchSpout spout = new FixedBatchSpout(new Fields("vehicle", "driver"), 10, Vehicle.generateVehicles(20)); + spout.setCycle(true); + + TridentTopology topology = new TridentTopology(); + Stream vehiclesStream = topology.newStream("spout1", spout). + each(new Fields("vehicle"), new Debug("##### vehicles")); + + vehiclesStream.min(new SpeedComparator()) + .each(new Fields("vehicle"), new Debug("#### slowest vehicle")) + .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### slowest driver")); + + vehiclesStream.max(new SpeedComparator()) + .each(new Fields("vehicle"), new Debug("#### fastest vehicle")) + .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### fastest driver")); + + vehiclesStream.max(new EfficiencyComparator()). + each(new Fields("vehicle"), new Debug("#### efficient vehicle")); + + return topology.build(); + } + + public static void main(String[] args) throws Exception { + Config conf = new Config(); + conf.setMaxSpoutPending(20); + StormTopology[] topologies = {buildWordsTopology(), buildIdsTopology(), buildVehiclesTopology()}; + if (args.length == 0) { + for (StormTopology topology : topologies) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("min-max-topology", conf, topology); + Utils.sleep(60*1000); + cluster.shutdown(); + } + System.exit(0); + } else { + conf.setNumWorkers(3); + int ct=1; + for (StormTopology topology : topologies) { + StormSubmitter.submitTopologyWithProgressBar(args[0]+"-"+ct++, conf, topology); + } + } + } + + static class SpeedComparator implements Comparator<TridentTuple>, Serializable { + + @Override + public int compare(TridentTuple tuple1, TridentTuple tuple2) { + Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle"); + Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle"); + return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed); + } + } + + static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable { + + @Override + public int compare(TridentTuple tuple1, TridentTuple tuple2) { + Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle"); + Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle"); + return Double.compare(vehicle1.efficiency, vehicle2.efficiency); + } + + } + + static class Driver implements Serializable { + final String name; + final int id; + + Driver(String name, int id) { + this.name = name; + this.id = id; + } + + @Override + public String toString() { + return "Driver{" + + "name='" + name + '\'' + + ", id=" + id + + '}'; + } + } + + static class Vehicle implements Serializable { + final String name; + final int maxSpeed; + final double efficiency; + + public Vehicle(String name, int maxSpeed, double efficiency) { + this.name = name; + this.maxSpeed = maxSpeed; + this.efficiency = efficiency; + } + + @Override + public String toString() { + return "Vehicle{" + + "name='" + name + '\'' + + ", maxSpeed=" + maxSpeed + + ", efficiency=" + efficiency + + '}'; + } + + public static List<Object>[] generateVehicles(int count) { + List<Object>[] vehicles = new List[count]; + for(int i=0; i<count; i++) { + int id = i-1; + vehicles[i] = + (new Values( + new Vehicle("Vehicle-"+id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)), + new Driver("Driver-"+id, id) + )); + } + return vehicles; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/Stream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java index 7c6d93f..fa62b72 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java +++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java @@ -19,31 +19,33 @@ package org.apache.storm.trident; import org.apache.storm.generated.Grouping; import org.apache.storm.generated.NullStruct; -import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer; import org.apache.storm.grouping.CustomStreamGrouping; -import org.apache.storm.trident.operation.Consumer; -import org.apache.storm.trident.operation.FlatMapFunction; -import org.apache.storm.trident.operation.MapFunction; -import org.apache.storm.trident.operation.impl.ConsumerExecutor; -import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor; -import org.apache.storm.trident.operation.impl.MapFunctionExecutor; -import org.apache.storm.trident.planner.processor.MapProcessor; -import org.apache.storm.tuple.Fields; -import org.apache.storm.utils.Utils; +import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer; import org.apache.storm.trident.fluent.GlobalAggregationScheme; import org.apache.storm.trident.fluent.GroupedStream; import org.apache.storm.trident.fluent.IAggregatableStream; import org.apache.storm.trident.operation.Aggregator; import org.apache.storm.trident.operation.Assembly; import org.apache.storm.trident.operation.CombinerAggregator; +import org.apache.storm.trident.operation.Consumer; import org.apache.storm.trident.operation.Filter; +import org.apache.storm.trident.operation.FlatMapFunction; import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.MapFunction; import org.apache.storm.trident.operation.ReducerAggregator; +import org.apache.storm.trident.operation.builtin.ComparisonAggregator; +import org.apache.storm.trident.operation.builtin.Max; +import org.apache.storm.trident.operation.builtin.MaxWithComparator; +import org.apache.storm.trident.operation.builtin.Min; +import org.apache.storm.trident.operation.builtin.MinWithComparator; import org.apache.storm.trident.operation.impl.CombinerAggStateUpdater; +import org.apache.storm.trident.operation.impl.ConsumerExecutor; import org.apache.storm.trident.operation.impl.FilterExecutor; +import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor; import org.apache.storm.trident.operation.impl.GlobalBatchToPartition; -import org.apache.storm.trident.operation.impl.ReducerAggStateUpdater; import org.apache.storm.trident.operation.impl.IndexHashBatchToPartition; +import org.apache.storm.trident.operation.impl.MapFunctionExecutor; +import org.apache.storm.trident.operation.impl.ReducerAggStateUpdater; import org.apache.storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition; import org.apache.storm.trident.operation.impl.TrueFilter; import org.apache.storm.trident.partition.GlobalGrouping; @@ -55,6 +57,7 @@ import org.apache.storm.trident.planner.PartitionNode; import org.apache.storm.trident.planner.ProcessorNode; import org.apache.storm.trident.planner.processor.AggregateProcessor; import org.apache.storm.trident.planner.processor.EachProcessor; +import org.apache.storm.trident.planner.processor.MapProcessor; import org.apache.storm.trident.planner.processor.PartitionPersistProcessor; import org.apache.storm.trident.planner.processor.ProjectedProcessor; import org.apache.storm.trident.planner.processor.StateQueryProcessor; @@ -62,7 +65,12 @@ import org.apache.storm.trident.state.QueryFunction; import org.apache.storm.trident.state.StateFactory; import org.apache.storm.trident.state.StateSpec; import org.apache.storm.trident.state.StateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.trident.util.TridentUtils; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; + +import java.util.Comparator; /** * A Stream represents the core data model in Trident, and can be thought of as a "stream" of tuples that are processed @@ -436,8 +444,91 @@ public class Stream implements IAggregatableStream { return chainedAgg() .partitionAggregate(inputFields, agg, functionFields) .chainEnd(); - } - + } + + /** + * This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} and it is + * assumed that its value is an instance of {@code Comparable}. + * + * @param inputFieldName input field name + * @return + */ + public Stream minBy(String inputFieldName) { + Aggregator<ComparisonAggregator.State> min = new Min(inputFieldName); + return comparableAggregateStream(inputFieldName, min); + } + + /** + * This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} in a stream by + * using the given {@code comparator}. + * + * @param inputFieldName input field name + * @param comparator comparator used in for finding minimum of two tuple values of {@code inputFieldName}. + * @param <T> type of tuple's given input field value. + * @return + */ + public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) { + Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(inputFieldName, comparator); + return comparableAggregateStream(inputFieldName, min); + } + + /** + * This aggregator operation computes the minimum of tuples in a stream by using the given {@code comparator} with + * {@code TridentTuple}s. + * + * @param comparator comparator used in for finding minimum of two tuple values. + * @return + */ + public Stream min(Comparator<TridentTuple> comparator) { + Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(comparator); + return comparableAggregateStream(null, min); + } + + /** + * This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} and it is + * assumed that its value is an instance of {@code Comparable}. + * + * @param inputFieldName input field name + * @return + */ + public Stream maxBy(String inputFieldName) { + Aggregator<ComparisonAggregator.State> max = new Max(inputFieldName); + return comparableAggregateStream(inputFieldName, max); + } + + /** + * This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} in a stream by + * using the given {@code comparator}. + * + * @param inputFieldName input field name + * @param comparator comparator used in for finding maximum of two tuple values of {@code inputFieldName}. + * @param <T> type of tuple's given input field value. + * @return + */ + public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) { + Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(inputFieldName, comparator); + return comparableAggregateStream(inputFieldName, max); + } + + /** + * This aggregator operation computes the maximum of tuples in a stream by using the given {@code comparator} with + * {@code TridentTuple}s. + * + * @param comparator comparator used in for finding maximum of two tuple values. + * @return + */ + public Stream max(Comparator<TridentTuple> comparator) { + Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(comparator); + return comparableAggregateStream(null, max); + } + + private <T> Stream comparableAggregateStream(String inputFieldName, Aggregator<T> aggregator) { + if(inputFieldName != null) { + projectionValidation(new Fields(inputFieldName)); + } + return partitionAggregate(getOutputFields(), aggregator, getOutputFields()); + } + public Stream aggregate(Aggregator agg, Fields functionFields) { return aggregate(null, agg, functionFields); } http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java new file mode 100644 index 0000000..0109bb5 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.operation.builtin; + +import org.apache.storm.trident.operation.BaseAggregator; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.tuple.TridentTuple; + +/** + * Abstract {@code Aggregator} for comparing two values in a stream. + * + */ +public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonAggregator.State> { + + public static class State { + TridentTuple previousTuple; + } + + private final String inputFieldName; + + public ComparisonAggregator(String inputFieldName) { + this.inputFieldName = inputFieldName; + } + + protected abstract T compare(T value1, T value2); + + @Override + public State init(Object batchId, TridentCollector collector) { + return new State(); + } + + @Override + public void aggregate(State state, TridentTuple tuple, TridentCollector collector) { + T value1 = valueFromTuple(state.previousTuple); + T value2 = valueFromTuple(tuple); + + if(value2 == null) { + return; + } + + if(value1 == null || compare(value1, value2) == value2) { + state.previousTuple = tuple; + } + + } + + protected T valueFromTuple(TridentTuple tuple) { + // when there is no input field then the whole tuple is considered for comparison. + return (T) (inputFieldName != null && tuple != null ? tuple.getValueByField(inputFieldName) : tuple); + } + + @Override + public void complete(State state, TridentCollector collector) { + collector.emit(state.previousTuple.getValues()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java new file mode 100644 index 0000000..5385dfb --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.operation.builtin; + +/** + * This aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and + * it is an instance of {@code Comparable}. + * + */ +public class Max extends ComparisonAggregator<Comparable<Object>> { + + public Max(String inputFieldName) { + super(inputFieldName); + } + + @Override + protected Comparable<Object> compare(Comparable<Object> value1, Comparable<Object> value2) { + return value1.compareTo(value2) > 0 ? value1 : value2; + } + + /** + * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and + * it is an instance of {@code Comparable}. + * + * @return + */ +} http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java new file mode 100644 index 0000000..172aa58 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.operation.builtin; + +import java.util.Comparator; + +/** + * This aggregator computes the maximum of aggregated tuples in a stream. It uses given {@code comparator} for comparing + * two values in a stream. + * + */ +public class MaxWithComparator<T> extends ComparisonAggregator<T> { + private final Comparator<T> comparator; + + public MaxWithComparator(Comparator<T> comparator) { + this(null, comparator); + } + + public MaxWithComparator(String inputFieldName, Comparator<T> comparator) { + super(inputFieldName); + this.comparator = comparator; + } + + @Override + protected T compare(T value1, T value2) { + return comparator.compare(value1, value2) > 0 ? value1 : value2; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java new file mode 100644 index 0000000..0757d7c --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.operation.builtin; + +/** + * This aggregator computes the minimum of aggregated tuples in a stream. It assumes that the tuple has one value and + * it is an instance of {@code Comparable}. + * + */ +public class Min extends ComparisonAggregator<Comparable<Object>> { + + public Min(String inputFieldName) { + super(inputFieldName); + } + + @Override + protected Comparable<Object> compare(Comparable<Object> value1, Comparable<Object> value2) { + return value1.compareTo(value2) < 0 ? value1 : value2; + } + + /** + * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and + * it is an instance of {@code Comparable}. + * + * @return + * @param inputFieldName + */ +} http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java new file mode 100644 index 0000000..d33e000 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.operation.builtin; + +import java.util.Comparator; + +/** + * This aggregator computes the minimum of aggregated tuples in a stream. It uses given @{code comparator} for comparing + * two values in a stream. + * + */ +public class MinWithComparator<T> extends ComparisonAggregator<T> { + private final Comparator<T> comparator; + + public MinWithComparator(String inputFieldName, Comparator<T> comparator) { + super(inputFieldName); + this.comparator = comparator; + } + + public MinWithComparator(Comparator<T> comparator) { + this(null, comparator); + } + + @Override + protected T compare(T value1, T value2) { + return comparator.compare(value1, value2) < 0 ? value1 : value2; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java new file mode 100644 index 0000000..a4a9a79 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.testing; + +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.spout.IBatchSpout; +import org.apache.storm.tuple.Fields; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +/** + * + */ +public class NumberGeneratorSpout implements IBatchSpout { + private final Fields fields; + private final int batchSize; + private final int maxNumber; + private final Map<Long, List<List<Object>>> batches = new HashMap<>(); + + public NumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) { + this.fields = fields; + this.batchSize = batchSize; + this.maxNumber = maxNumber; + } + + @Override + public void open(Map conf, TopologyContext context) { + } + + @Override + public void emitBatch(long batchId, TridentCollector collector) { + List<List<Object>> values = null; + if(batches.containsKey(batchId)) { + values = batches.get(batchId); + } else { + values = new ArrayList<>(); + for (int i = 0; i < batchSize; i++) { + values.add(Collections.singletonList((Object) ThreadLocalRandom.current().nextInt(0, maxNumber + 1))); + } + batches.put(batchId, values); + } + for (List<Object> value : values) { + collector.emit(value); + } + } + + @Override + public void ack(long batchId) { + batches.remove(batchId); + } + + @Override + public void close() { + + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Config conf = new Config(); + conf.setMaxTaskParallelism(1); + return conf; + } + + @Override + public Fields getOutputFields() { + return fields; + } +}
