Repository: storm
Updated Branches:
  refs/heads/master 82bea75c9 -> 46999909a


min/max operators implementation in Trident streams API.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5295a909
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5295a909
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5295a909

Branch: refs/heads/master
Commit: 5295a909b1fc8c136a220a0e636c0d70c036e5c5
Parents: 31b57e8
Author: Satish Duggana <[email protected]>
Authored: Fri Jan 29 12:39:26 2016 +0530
Committer: Satish Duggana <[email protected]>
Committed: Mon Feb 8 11:14:41 2016 +0530

----------------------------------------------------------------------
 .../TridentMinMaxOperationsTopology.java        | 208 +++++++++++++++++++
 .../jvm/org/apache/storm/trident/Stream.java    | 117 +++++++++--
 .../operation/builtin/ComparisonAggregator.java |  72 +++++++
 .../storm/trident/operation/builtin/Max.java    |  43 ++++
 .../operation/builtin/MaxWithComparator.java    |  44 ++++
 .../storm/trident/operation/builtin/Min.java    |  44 ++++
 .../operation/builtin/MinWithComparator.java    |  44 ++++
 .../trident/testing/NumberGeneratorSpout.java   |  92 ++++++++
 8 files changed, 651 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
new file mode 100644
index 0000000..dedaaff
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.NumberGeneratorSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This class contains different usages of minBy, maxBy, min and max 
operations on trident streams.
+ *
+ */
+public class TridentMinMaxOperationsTopology {
+    public static class Split extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word : sentence.split(" ")) {
+                collector.emit(new Values(word));
+            }
+        }
+    }
+
+    public static StormTopology buildIdsTopology() {
+        NumberGeneratorSpout spout = new NumberGeneratorSpout(new 
Fields("id"), 10, 1000);
+
+        TridentTopology topology = new TridentTopology();
+        Stream wordsStream = topology.newStream("numgen-spout", spout).
+                each(new Fields("id"), new Debug("##### ids"));
+
+        wordsStream.minBy("id").
+                each(new Fields("id"), new Debug("#### min-id"));
+
+        wordsStream.maxBy("id").
+                each(new Fields("id"), new Debug("#### max-id"));
+
+        return topology.build();
+    }
+
+    public static StormTopology buildWordsTopology() {
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, 
new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"), 
new Values("four score and seven years ago"),
+                new Values("how many apples can you eat"), new Values("to be 
or not to be the person"));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream wordsStream = topology.newStream("spout1", 
spout).parallelismHint(16).
+                each(new Fields("sentence"), new Split(), new Fields("word")).
+                each(new Fields("word"), new Debug("##### words"));
+
+        wordsStream.minBy("word").
+                each(new Fields("word"), new Debug("#### lowest word"));
+
+        wordsStream.maxBy("word").
+                each(new Fields("word"), new Debug("#### highest word"));
+
+        return topology.build();
+    }
+
+    public static StormTopology buildVehiclesTopology() {
+
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("vehicle", 
"driver"), 10, Vehicle.generateVehicles(20));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream vehiclesStream = topology.newStream("spout1", spout).
+                each(new Fields("vehicle"), new Debug("##### vehicles"));
+
+        vehiclesStream.min(new SpeedComparator())
+                .each(new Fields("vehicle"), new Debug("#### slowest vehicle"))
+                .project(new Fields("driver")). each(new Fields("driver"), new 
Debug("##### slowest driver"));
+
+        vehiclesStream.max(new SpeedComparator())
+                .each(new Fields("vehicle"), new Debug("#### fastest vehicle"))
+                .project(new Fields("driver")). each(new Fields("driver"), new 
Debug("##### fastest driver"));
+
+        vehiclesStream.max(new EfficiencyComparator()).
+                each(new Fields("vehicle"), new Debug("#### efficient 
vehicle"));
+
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        StormTopology[] topologies = {buildWordsTopology(), 
buildIdsTopology(), buildVehiclesTopology()};
+        if (args.length == 0) {
+            for (StormTopology topology : topologies) {
+                LocalCluster cluster = new LocalCluster();
+                cluster.submitTopology("min-max-topology", conf, topology);
+                Utils.sleep(60*1000);
+                cluster.shutdown();
+            }
+            System.exit(0);
+        } else {
+            conf.setNumWorkers(3);
+            int ct=1;
+            for (StormTopology topology : topologies) {
+                StormSubmitter.submitTopologyWithProgressBar(args[0]+"-"+ct++, 
conf, topology);
+            }
+        }
+    }
+
+    static class SpeedComparator implements Comparator<TridentTuple>, 
Serializable {
+
+        @Override
+        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
+            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
+            return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
+        }
+    }
+
+    static class EfficiencyComparator implements Comparator<TridentTuple>, 
Serializable {
+
+        @Override
+        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
+            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
+            return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
+        }
+
+    }
+
+    static class Driver implements Serializable {
+        final String name;
+        final int id;
+
+        Driver(String name, int id) {
+            this.name = name;
+            this.id = id;
+        }
+
+        @Override
+        public String toString() {
+            return "Driver{" +
+                    "name='" + name + '\'' +
+                    ", id=" + id +
+                    '}';
+        }
+    }
+
+    static class Vehicle implements Serializable {
+        final String name;
+        final int maxSpeed;
+        final double efficiency;
+
+        public Vehicle(String name, int maxSpeed, double efficiency) {
+            this.name = name;
+            this.maxSpeed = maxSpeed;
+            this.efficiency = efficiency;
+        }
+
+        @Override
+        public String toString() {
+            return "Vehicle{" +
+                    "name='" + name + '\'' +
+                    ", maxSpeed=" + maxSpeed +
+                    ", efficiency=" + efficiency +
+                    '}';
+        }
+
+        public static List<Object>[] generateVehicles(int count) {
+            List<Object>[] vehicles = new List[count];
+            for(int i=0; i<count; i++) {
+                int id = i-1;
+                vehicles[i] =
+                        (new Values(
+                                new Vehicle("Vehicle-"+id, 
ThreadLocalRandom.current().nextInt(0, 100), 
ThreadLocalRandom.current().nextDouble(1, 5)),
+                                new Driver("Driver-"+id, id)
+                        ));
+            }
+            return vehicles;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java 
b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 7c6d93f..fa62b72 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -19,31 +19,33 @@ package org.apache.storm.trident;
 
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
-import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.trident.operation.Consumer;
-import org.apache.storm.trident.operation.FlatMapFunction;
-import org.apache.storm.trident.operation.MapFunction;
-import org.apache.storm.trident.operation.impl.ConsumerExecutor;
-import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor;
-import org.apache.storm.trident.operation.impl.MapFunctionExecutor;
-import org.apache.storm.trident.planner.processor.MapProcessor;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
+import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.trident.fluent.GlobalAggregationScheme;
 import org.apache.storm.trident.fluent.GroupedStream;
 import org.apache.storm.trident.fluent.IAggregatableStream;
 import org.apache.storm.trident.operation.Aggregator;
 import org.apache.storm.trident.operation.Assembly;
 import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.operation.Consumer;
 import org.apache.storm.trident.operation.Filter;
+import org.apache.storm.trident.operation.FlatMapFunction;
 import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.MapFunction;
 import org.apache.storm.trident.operation.ReducerAggregator;
+import org.apache.storm.trident.operation.builtin.ComparisonAggregator;
+import org.apache.storm.trident.operation.builtin.Max;
+import org.apache.storm.trident.operation.builtin.MaxWithComparator;
+import org.apache.storm.trident.operation.builtin.Min;
+import org.apache.storm.trident.operation.builtin.MinWithComparator;
 import org.apache.storm.trident.operation.impl.CombinerAggStateUpdater;
+import org.apache.storm.trident.operation.impl.ConsumerExecutor;
 import org.apache.storm.trident.operation.impl.FilterExecutor;
+import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor;
 import org.apache.storm.trident.operation.impl.GlobalBatchToPartition;
-import org.apache.storm.trident.operation.impl.ReducerAggStateUpdater;
 import org.apache.storm.trident.operation.impl.IndexHashBatchToPartition;
+import org.apache.storm.trident.operation.impl.MapFunctionExecutor;
+import org.apache.storm.trident.operation.impl.ReducerAggStateUpdater;
 import 
org.apache.storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
 import org.apache.storm.trident.operation.impl.TrueFilter;
 import org.apache.storm.trident.partition.GlobalGrouping;
@@ -55,6 +57,7 @@ import org.apache.storm.trident.planner.PartitionNode;
 import org.apache.storm.trident.planner.ProcessorNode;
 import org.apache.storm.trident.planner.processor.AggregateProcessor;
 import org.apache.storm.trident.planner.processor.EachProcessor;
+import org.apache.storm.trident.planner.processor.MapProcessor;
 import org.apache.storm.trident.planner.processor.PartitionPersistProcessor;
 import org.apache.storm.trident.planner.processor.ProjectedProcessor;
 import org.apache.storm.trident.planner.processor.StateQueryProcessor;
@@ -62,7 +65,12 @@ import org.apache.storm.trident.state.QueryFunction;
 import org.apache.storm.trident.state.StateFactory;
 import org.apache.storm.trident.state.StateSpec;
 import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.util.TridentUtils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.util.Comparator;
 
 /**
  * A Stream represents the core data model in Trident, and can be thought of 
as a "stream" of tuples that are processed
@@ -436,8 +444,91 @@ public class Stream implements IAggregatableStream {
         return chainedAgg()
                .partitionAggregate(inputFields, agg, functionFields)
                .chainEnd();
-    }  
-    
+    }
+
+    /**
+     * This aggregator operation computes the minimum of tuples by the given 
{@code inputFieldName} and it is
+     * assumed that its value is an instance of {@code Comparable}.
+     *
+     * @param inputFieldName input field name
+     * @return
+     */
+    public Stream minBy(String inputFieldName) {
+        Aggregator<ComparisonAggregator.State> min = new Min(inputFieldName);
+        return comparableAggregateStream(inputFieldName, min);
+    }
+
+    /**
+     * This aggregator operation computes the minimum of tuples by the given 
{@code inputFieldName} in a stream by
+     * using the given {@code comparator}.
+     *
+     * @param inputFieldName input field name
+     * @param comparator comparator used in for finding minimum of two tuple 
values of {@code inputFieldName}.
+     * @param <T> type of tuple's given input field value.
+     * @return
+     */
+    public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) {
+        Aggregator<ComparisonAggregator.State> min = new 
MinWithComparator<>(inputFieldName, comparator);
+        return comparableAggregateStream(inputFieldName, min);
+    }
+
+    /**
+     * This aggregator operation computes the minimum of tuples in a stream by 
using the given {@code comparator} with
+     * {@code TridentTuple}s.
+     *
+     * @param comparator comparator used in for finding minimum of two tuple 
values.
+     * @return
+     */
+    public Stream min(Comparator<TridentTuple> comparator) {
+        Aggregator<ComparisonAggregator.State> min = new 
MinWithComparator<>(comparator);
+        return comparableAggregateStream(null, min);
+    }
+
+    /**
+     * This aggregator operation computes the maximum of tuples by the given 
{@code inputFieldName} and it is
+     * assumed that its value is an instance of {@code Comparable}.
+     *
+     * @param inputFieldName input field name
+     * @return
+     */
+    public Stream maxBy(String inputFieldName) {
+        Aggregator<ComparisonAggregator.State> max = new Max(inputFieldName);
+        return comparableAggregateStream(inputFieldName, max);
+    }
+
+    /**
+     * This aggregator operation computes the maximum of tuples by the given 
{@code inputFieldName} in a stream by
+     * using the given {@code comparator}.
+     *
+     * @param inputFieldName input field name
+     * @param comparator comparator used in for finding maximum of two tuple 
values of {@code inputFieldName}.
+     * @param <T> type of tuple's given input field value.
+     * @return
+     */
+    public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) {
+        Aggregator<ComparisonAggregator.State> max = new 
MaxWithComparator<>(inputFieldName, comparator);
+        return comparableAggregateStream(inputFieldName, max);
+    }
+
+    /**
+     * This aggregator operation computes the maximum of tuples in a stream by 
using the given {@code comparator} with
+     * {@code TridentTuple}s.
+     *
+     * @param comparator comparator used in for finding maximum of two tuple 
values.
+     * @return
+     */
+    public Stream max(Comparator<TridentTuple> comparator) {
+        Aggregator<ComparisonAggregator.State> max = new 
MaxWithComparator<>(comparator);
+        return comparableAggregateStream(null, max);
+    }
+
+    private <T> Stream comparableAggregateStream(String inputFieldName, 
Aggregator<T> aggregator) {
+        if(inputFieldName != null) {
+            projectionValidation(new Fields(inputFieldName));
+        }
+        return partitionAggregate(getOutputFields(), aggregator, 
getOutputFields());
+    }
+
     public Stream aggregate(Aggregator agg, Fields functionFields) {
         return aggregate(null, agg, functionFields);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
new file mode 100644
index 0000000..0109bb5
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * Abstract {@code Aggregator} for comparing two values in a stream.
+ *
+ */
+public abstract class ComparisonAggregator<T> extends 
BaseAggregator<ComparisonAggregator.State> {
+
+    public static class State {
+        TridentTuple previousTuple;
+    }
+
+    private final String inputFieldName;
+
+    public ComparisonAggregator(String inputFieldName) {
+        this.inputFieldName = inputFieldName;
+    }
+
+    protected abstract T compare(T value1, T value2);
+
+    @Override
+    public State init(Object batchId, TridentCollector collector) {
+        return new State();
+    }
+
+    @Override
+    public void aggregate(State state, TridentTuple tuple, TridentCollector 
collector) {
+        T value1 = valueFromTuple(state.previousTuple);
+        T value2 = valueFromTuple(tuple);
+
+        if(value2 == null) {
+            return;
+        }
+
+        if(value1 == null || compare(value1, value2) == value2) {
+            state.previousTuple = tuple;
+        }
+
+    }
+
+    protected T valueFromTuple(TridentTuple tuple) {
+        // when there is no input field then the whole tuple is considered for 
comparison.
+        return (T) (inputFieldName != null && tuple != null ? 
tuple.getValueByField(inputFieldName) : tuple);
+    }
+
+    @Override
+    public void complete(State state, TridentCollector collector) {
+        collector.emit(state.previousTuple.getValues());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
new file mode 100644
index 0000000..5385dfb
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+/**
+ * This aggregator computes the maximum of aggregated tuples in a stream. It 
assumes that the tuple has one value and
+ * it is an instance of {@code Comparable}.
+ *
+ */
+public class Max extends ComparisonAggregator<Comparable<Object>> {
+
+    public Max(String inputFieldName) {
+        super(inputFieldName);
+    }
+
+    @Override
+    protected Comparable<Object> compare(Comparable<Object> value1, 
Comparable<Object> value2) {
+        return value1.compareTo(value2) > 0 ? value1 : value2;
+    }
+
+    /**
+     * Returns an aggregator computes the maximum of aggregated tuples in a 
stream. It assumes that the tuple has one value and
+     * it is an instance of {@code Comparable}.
+     *
+     * @return
+     */
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
new file mode 100644
index 0000000..172aa58
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+import java.util.Comparator;
+
+/**
+ * This aggregator computes the maximum of aggregated tuples in a stream. It 
uses given {@code comparator} for comparing
+ * two values in a stream.
+ *
+ */
+public class MaxWithComparator<T> extends ComparisonAggregator<T> {
+    private final Comparator<T> comparator;
+    
+    public MaxWithComparator(Comparator<T> comparator) {
+        this(null, comparator);
+    }
+
+    public MaxWithComparator(String inputFieldName, Comparator<T> comparator) {
+        super(inputFieldName);
+        this.comparator = comparator;
+    }
+
+    @Override
+    protected T compare(T value1, T value2) {
+        return comparator.compare(value1, value2) > 0 ? value1 : value2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
new file mode 100644
index 0000000..0757d7c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+/**
+ * This aggregator computes the minimum of aggregated tuples in a stream. It 
assumes that the tuple has one value and
+ * it is an instance of {@code Comparable}.
+ *
+ */
+public class Min extends ComparisonAggregator<Comparable<Object>> {
+
+    public Min(String inputFieldName) {
+        super(inputFieldName);
+    }
+
+    @Override
+    protected Comparable<Object> compare(Comparable<Object> value1, 
Comparable<Object> value2) {
+        return value1.compareTo(value2) < 0 ? value1 : value2;
+    }
+
+    /**
+     * Returns an aggregator computes the maximum of aggregated tuples in a 
stream. It assumes that the tuple has one value and
+     * it is an instance of {@code Comparable}.
+     *
+     * @return
+     * @param inputFieldName
+     */
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
new file mode 100644
index 0000000..d33e000
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+import java.util.Comparator;
+
+/**
+ * This aggregator computes the minimum of aggregated tuples in a stream. It 
uses given @{code comparator} for comparing
+ * two values in a stream.
+ *
+ */
+public class MinWithComparator<T> extends ComparisonAggregator<T> {
+    private final Comparator<T> comparator;
+
+    public MinWithComparator(String inputFieldName, Comparator<T> comparator) {
+        super(inputFieldName);
+        this.comparator = comparator;
+    }
+
+    public MinWithComparator(Comparator<T> comparator) {
+        this(null, comparator);
+    }
+
+    @Override
+    protected T compare(T value1, T value2) {
+        return comparator.compare(value1, value2) < 0 ? value1 : value2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java 
b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
new file mode 100644
index 0000000..a4a9a79
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.testing;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ *
+ */
+public class NumberGeneratorSpout implements IBatchSpout {
+    private final Fields fields;
+    private final int batchSize;
+    private final int maxNumber;
+    private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+    public NumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) {
+        this.fields = fields;
+        this.batchSize = batchSize;
+        this.maxNumber = maxNumber;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context) {
+    }
+
+    @Override
+    public void emitBatch(long batchId, TridentCollector collector) {
+        List<List<Object>> values = null;
+        if(batches.containsKey(batchId)) {
+            values = batches.get(batchId);
+        } else {
+            values = new ArrayList<>();
+            for (int i = 0; i < batchSize; i++) {
+                values.add(Collections.singletonList((Object) 
ThreadLocalRandom.current().nextInt(0, maxNumber + 1)));
+            }
+            batches.put(batchId, values);
+        }
+        for (List<Object> value : values) {
+            collector.emit(value);
+        }
+    }
+
+    @Override
+    public void ack(long batchId) {
+        batches.remove(batchId);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(1);
+        return conf;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return fields;
+    }
+}

Reply via email to