Addressed review comments

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc263cba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc263cba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc263cba

Branch: refs/heads/master
Commit: bc263cba67283b0c1ebe95be49e137fad86b3978
Parents: 5295a90
Author: Satish Duggana <[email protected]>
Authored: Mon Feb 8 16:49:46 2016 +0530
Committer: Satish Duggana <[email protected]>
Committed: Tue Feb 9 10:31:18 2016 +0530

----------------------------------------------------------------------
 .../spout/RandomNumberGeneratorSpout.java       |  95 +++++++++
 .../trident/TridentMinMaxOfDevicesTopology.java | 201 ++++++++++++++++++
 .../TridentMinMaxOfVehiclesTopology.java        | 180 ++++++++++++++++
 .../TridentMinMaxOperationsTopology.java        | 208 -------------------
 .../jvm/org/apache/storm/trident/Stream.java    |  24 ++-
 .../operation/builtin/ComparisonAggregator.java |  23 +-
 .../storm/trident/operation/builtin/Max.java    |   6 -
 .../operation/builtin/MaxWithComparator.java    |   7 +
 .../storm/trident/operation/builtin/Min.java    |   8 -
 .../operation/builtin/MinWithComparator.java    |   7 +
 .../trident/testing/NumberGeneratorSpout.java   |  92 --------
 11 files changed, 525 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java
new file mode 100644
index 0000000..1d1b082
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.spout;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This spout generates random whole numbers with given {@code maxNumber} 
value as maximum with the given {@code fields}.
+ *
+ */
+public class RandomNumberGeneratorSpout implements IBatchSpout {
+    private final Fields fields;
+    private final int batchSize;
+    private final int maxNumber;
+    private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+    public RandomNumberGeneratorSpout(Fields fields, int batchSize, int 
maxNumber) {
+        this.fields = fields;
+        this.batchSize = batchSize;
+        this.maxNumber = maxNumber;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context) {
+    }
+
+    @Override
+    public void emitBatch(long batchId, TridentCollector collector) {
+        List<List<Object>> values = null;
+        if(batches.containsKey(batchId)) {
+            values = batches.get(batchId);
+        } else {
+            values = new ArrayList<>();
+            for (int i = 0; i < batchSize; i++) {
+                List<Object> numbers = new ArrayList<>();
+                for (int x=0; x<fields.size(); x++) {
+                    numbers.add(ThreadLocalRandom.current().nextInt(0, 
maxNumber + 1));
+                }
+                values.add(numbers);
+            }
+            batches.put(batchId, values);
+        }
+        for (List<Object> value : values) {
+            collector.emit(value);
+        }
+    }
+
+    @Override
+    public void ack(long batchId) {
+        batches.remove(batchId);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(1);
+        return conf;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return fields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
new file mode 100644
index 0000000..d985436
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.starter.spout.RandomNumberGeneratorSpout;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This class demonstrates different usages of
+ * * {@link Stream#minBy(String)}
+ * * {@link Stream#maxBy(String)}
+ * operations on trident {@link Stream}.
+ */
+public class TridentMinMaxOfDevicesTopology {
+
+    /**
+     * Creates a topology with device-id and count (which are whole numbers) 
as tuple fields in a stream and it finally
+     * generates result stream based on min amd max with device-id and count 
values.
+     */
+    public static StormTopology buildDevicesTopology() {
+        String deviceID = "device-id";
+        String count = "count";
+        Fields allFields = new Fields(deviceID, count);
+
+        RandomNumberGeneratorSpout spout = new 
RandomNumberGeneratorSpout(allFields, 10, 1000);
+
+        TridentTopology topology = new TridentTopology();
+        Stream devicesStream = topology.newStream("devicegen-spout", spout).
+                each(allFields, new Debug("##### devices"));
+
+        devicesStream.minBy(deviceID).
+                each(allFields, new Debug("#### device with min id"));
+
+        devicesStream.maxBy(count).
+                each(allFields, new Debug("#### device with max count"));
+
+        return topology.build();
+    }
+
+    /**
+     * Creates a topology which demonstrates min/max operations on tuples of 
stream which contain vehicle and driver fields
+     * with values {@link TridentMinMaxOfDevicesTopology.Vehicle} and {@link 
TridentMinMaxOfDevicesTopology.Driver} respectively.
+     */
+    public static StormTopology buildVehiclesTopology() {
+        Fields driverField = new Fields(Driver.FIELD_NAME);
+        Fields vehicleField = new Fields(Vehicle.FIELD_NAME);
+        Fields allFields = new Fields(Vehicle.FIELD_NAME, Driver.FIELD_NAME);
+
+        FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, 
Vehicle.generateVehicles(20));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream vehiclesStream = topology.newStream("spout1", spout).
+                each(allFields, new Debug("##### vehicles"));
+
+        Stream slowVehiclesStream =
+                vehiclesStream
+                        .min(new SpeedComparator())
+                        .each(vehicleField, new Debug("#### slowest vehicle"));
+
+        Stream slowDriversStream =
+                slowVehiclesStream
+                        .project(driverField)
+                        .each(driverField, new Debug("##### slowest driver"));
+
+        vehiclesStream
+                .max(new SpeedComparator())
+                .each(vehicleField, new Debug("#### fastest vehicle"))
+                .project(driverField)
+                .each(driverField, new Debug("##### fastest driver"));
+
+        vehiclesStream
+                .max(new EfficiencyComparator()).
+                each(vehicleField, new Debug("#### efficient vehicle"));
+
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        StormTopology topology = buildDevicesTopology();
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        if (args.length == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("devices-topology", conf, topology);
+            Utils.sleep(60 * 1000);
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar("devices-topology", 
conf, topology);
+        }
+    }
+
+    static class SpeedComparator implements Comparator<TridentTuple>, 
Serializable {
+
+        @Override
+        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+            Vehicle vehicle1 = (Vehicle) 
tuple1.getValueByField(Vehicle.FIELD_NAME);
+            Vehicle vehicle2 = (Vehicle) 
tuple2.getValueByField(Vehicle.FIELD_NAME);
+            return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
+        }
+    }
+
+    static class EfficiencyComparator implements Comparator<TridentTuple>, 
Serializable {
+
+        @Override
+        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+            Vehicle vehicle1 = (Vehicle) 
tuple1.getValueByField(Vehicle.FIELD_NAME);
+            Vehicle vehicle2 = (Vehicle) 
tuple2.getValueByField(Vehicle.FIELD_NAME);
+            return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
+        }
+
+    }
+
+    static class Driver implements Serializable {
+        static final String FIELD_NAME = "driver";
+        final String name;
+        final int id;
+
+        Driver(String name, int id) {
+            this.name = name;
+            this.id = id;
+        }
+
+        @Override
+        public String toString() {
+            return "Driver{" +
+                    "name='" + name + '\'' +
+                    ", id=" + id +
+                    '}';
+        }
+    }
+
+    static class Vehicle implements Serializable {
+        static final String FIELD_NAME = "vehicle";
+        final String name;
+        final int maxSpeed;
+        final double efficiency;
+
+        public Vehicle(String name, int maxSpeed, double efficiency) {
+            this.name = name;
+            this.maxSpeed = maxSpeed;
+            this.efficiency = efficiency;
+        }
+
+        @Override
+        public String toString() {
+            return "Vehicle{" +
+                    "name='" + name + '\'' +
+                    ", maxSpeed=" + maxSpeed +
+                    ", efficiency=" + efficiency +
+                    '}';
+        }
+
+        public static List<Object>[] generateVehicles(int count) {
+            List<Object>[] vehicles = new List[count];
+            for (int i = 0; i < count; i++) {
+                int id = i - 1;
+                vehicles[i] =
+                        (new Values(
+                                new Vehicle("Vehicle-" + id, 
ThreadLocalRandom.current().nextInt(0, 100), 
ThreadLocalRandom.current().nextDouble(1, 5)),
+                                new Driver("Driver-" + id, id)
+                        ));
+            }
+            return vehicles;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
new file mode 100644
index 0000000..192b198
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This class demonstrates different usages of
+ * * {@link Stream#minBy(String, Comparator)}
+ * * {@link Stream#min(Comparator)}
+ * * {@link Stream#maxBy(String, Comparator)}
+ * * {@link Stream#max(Comparator)}
+ * operations on trident {@link Stream}.
+ */
+public class TridentMinMaxOfVehiclesTopology {
+
+    /**
+     * Creates a topology which demonstrates min/max operations on tuples of 
stream which contain vehicle and driver fields
+     * with values {@link TridentMinMaxOfVehiclesTopology.Vehicle} and {@link 
TridentMinMaxOfVehiclesTopology.Driver} respectively.
+     */
+    public static StormTopology buildVehiclesTopology() {
+        Fields driverField = new Fields(Driver.FIELD_NAME);
+        Fields vehicleField = new Fields(Vehicle.FIELD_NAME);
+        Fields allFields = new Fields(Vehicle.FIELD_NAME, Driver.FIELD_NAME);
+
+        FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, 
Vehicle.generateVehicles(20));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream vehiclesStream = topology.newStream("spout1", spout).
+                each(allFields, new Debug("##### vehicles"));
+
+        Stream slowVehiclesStream =
+                vehiclesStream
+                        .min(new SpeedComparator())
+                        .each(vehicleField, new Debug("#### slowest vehicle"));
+
+        Stream slowDriversStream =
+                slowVehiclesStream
+                        .project(driverField)
+                        .each(driverField, new Debug("##### slowest driver"));
+
+        vehiclesStream
+                .max(new SpeedComparator())
+                .each(vehicleField, new Debug("#### fastest vehicle"))
+                .project(driverField)
+                .each(driverField, new Debug("##### fastest driver"));
+
+        vehiclesStream
+                .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()).
+                each(vehicleField, new Debug("#### least efficient vehicle"));
+
+        vehiclesStream
+                .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()).
+                each(vehicleField, new Debug("#### most efficient vehicle"));
+
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        StormTopology topology = buildVehiclesTopology();
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        if (args.length == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("vehicles-topology", conf, topology);
+            Utils.sleep(60 * 1000);
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar("vehicles-topology", 
conf, topology);
+        }
+    }
+
+    static class SpeedComparator implements Comparator<TridentTuple>, 
Serializable {
+
+        @Override
+        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+            Vehicle vehicle1 = (Vehicle) 
tuple1.getValueByField(Vehicle.FIELD_NAME);
+            Vehicle vehicle2 = (Vehicle) 
tuple2.getValueByField(Vehicle.FIELD_NAME);
+            return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
+        }
+    }
+
+    static class EfficiencyComparator implements Comparator<Vehicle>, 
Serializable {
+
+        @Override
+        public int compare(Vehicle vehicle1, Vehicle vehicle2) {
+            return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
+        }
+
+    }
+
+    static class Driver implements Serializable {
+        static final String FIELD_NAME = "driver";
+        final String name;
+        final int id;
+
+        Driver(String name, int id) {
+            this.name = name;
+            this.id = id;
+        }
+
+        @Override
+        public String toString() {
+            return "Driver{" +
+                    "name='" + name + '\'' +
+                    ", id=" + id +
+                    '}';
+        }
+    }
+
+    static class Vehicle implements Serializable {
+        static final String FIELD_NAME = "vehicle";
+        final String name;
+        final int maxSpeed;
+        final double efficiency;
+
+        public Vehicle(String name, int maxSpeed, double efficiency) {
+            this.name = name;
+            this.maxSpeed = maxSpeed;
+            this.efficiency = efficiency;
+        }
+
+        @Override
+        public String toString() {
+            return "Vehicle{" +
+                    "name='" + name + '\'' +
+                    ", maxSpeed=" + maxSpeed +
+                    ", efficiency=" + efficiency +
+                    '}';
+        }
+
+        public static List<Object>[] generateVehicles(int count) {
+            List<Object>[] vehicles = new List[count];
+            for (int i = 0; i < count; i++) {
+                int id = i - 1;
+                vehicles[i] =
+                        (new Values(
+                                new Vehicle("Vehicle-" + id, 
ThreadLocalRandom.current().nextInt(0, 100), 
ThreadLocalRandom.current().nextDouble(1, 5)),
+                                new Driver("Driver-" + id, id)
+                        ));
+            }
+            return vehicles;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
deleted file mode 100644
index dedaaff..0000000
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.starter.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.builtin.Debug;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.trident.testing.NumberGeneratorSpout;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * This class contains different usages of minBy, maxBy, min and max 
operations on trident streams.
- *
- */
-public class TridentMinMaxOperationsTopology {
-    public static class Split extends BaseFunction {
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word : sentence.split(" ")) {
-                collector.emit(new Values(word));
-            }
-        }
-    }
-
-    public static StormTopology buildIdsTopology() {
-        NumberGeneratorSpout spout = new NumberGeneratorSpout(new 
Fields("id"), 10, 1000);
-
-        TridentTopology topology = new TridentTopology();
-        Stream wordsStream = topology.newStream("numgen-spout", spout).
-                each(new Fields("id"), new Debug("##### ids"));
-
-        wordsStream.minBy("id").
-                each(new Fields("id"), new Debug("#### min-id"));
-
-        wordsStream.maxBy("id").
-                each(new Fields("id"), new Debug("#### max-id"));
-
-        return topology.build();
-    }
-
-    public static StormTopology buildWordsTopology() {
-        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, 
new Values("the cow jumped over the moon"),
-                new Values("the man went to the store and bought some candy"), 
new Values("four score and seven years ago"),
-                new Values("how many apples can you eat"), new Values("to be 
or not to be the person"));
-        spout.setCycle(true);
-
-        TridentTopology topology = new TridentTopology();
-        Stream wordsStream = topology.newStream("spout1", 
spout).parallelismHint(16).
-                each(new Fields("sentence"), new Split(), new Fields("word")).
-                each(new Fields("word"), new Debug("##### words"));
-
-        wordsStream.minBy("word").
-                each(new Fields("word"), new Debug("#### lowest word"));
-
-        wordsStream.maxBy("word").
-                each(new Fields("word"), new Debug("#### highest word"));
-
-        return topology.build();
-    }
-
-    public static StormTopology buildVehiclesTopology() {
-
-        FixedBatchSpout spout = new FixedBatchSpout(new Fields("vehicle", 
"driver"), 10, Vehicle.generateVehicles(20));
-        spout.setCycle(true);
-
-        TridentTopology topology = new TridentTopology();
-        Stream vehiclesStream = topology.newStream("spout1", spout).
-                each(new Fields("vehicle"), new Debug("##### vehicles"));
-
-        vehiclesStream.min(new SpeedComparator())
-                .each(new Fields("vehicle"), new Debug("#### slowest vehicle"))
-                .project(new Fields("driver")). each(new Fields("driver"), new 
Debug("##### slowest driver"));
-
-        vehiclesStream.max(new SpeedComparator())
-                .each(new Fields("vehicle"), new Debug("#### fastest vehicle"))
-                .project(new Fields("driver")). each(new Fields("driver"), new 
Debug("##### fastest driver"));
-
-        vehiclesStream.max(new EfficiencyComparator()).
-                each(new Fields("vehicle"), new Debug("#### efficient 
vehicle"));
-
-        return topology.build();
-    }
-
-    public static void main(String[] args) throws Exception {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(20);
-        StormTopology[] topologies = {buildWordsTopology(), 
buildIdsTopology(), buildVehiclesTopology()};
-        if (args.length == 0) {
-            for (StormTopology topology : topologies) {
-                LocalCluster cluster = new LocalCluster();
-                cluster.submitTopology("min-max-topology", conf, topology);
-                Utils.sleep(60*1000);
-                cluster.shutdown();
-            }
-            System.exit(0);
-        } else {
-            conf.setNumWorkers(3);
-            int ct=1;
-            for (StormTopology topology : topologies) {
-                StormSubmitter.submitTopologyWithProgressBar(args[0]+"-"+ct++, 
conf, topology);
-            }
-        }
-    }
-
-    static class SpeedComparator implements Comparator<TridentTuple>, 
Serializable {
-
-        @Override
-        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
-            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
-            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
-            return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
-        }
-    }
-
-    static class EfficiencyComparator implements Comparator<TridentTuple>, 
Serializable {
-
-        @Override
-        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
-            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
-            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
-            return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
-        }
-
-    }
-
-    static class Driver implements Serializable {
-        final String name;
-        final int id;
-
-        Driver(String name, int id) {
-            this.name = name;
-            this.id = id;
-        }
-
-        @Override
-        public String toString() {
-            return "Driver{" +
-                    "name='" + name + '\'' +
-                    ", id=" + id +
-                    '}';
-        }
-    }
-
-    static class Vehicle implements Serializable {
-        final String name;
-        final int maxSpeed;
-        final double efficiency;
-
-        public Vehicle(String name, int maxSpeed, double efficiency) {
-            this.name = name;
-            this.maxSpeed = maxSpeed;
-            this.efficiency = efficiency;
-        }
-
-        @Override
-        public String toString() {
-            return "Vehicle{" +
-                    "name='" + name + '\'' +
-                    ", maxSpeed=" + maxSpeed +
-                    ", efficiency=" + efficiency +
-                    '}';
-        }
-
-        public static List<Object>[] generateVehicles(int count) {
-            List<Object>[] vehicles = new List[count];
-            for(int i=0; i<count; i++) {
-                int id = i-1;
-                vehicles[i] =
-                        (new Values(
-                                new Vehicle("Vehicle-"+id, 
ThreadLocalRandom.current().nextInt(0, 100), 
ThreadLocalRandom.current().nextDouble(1, 5)),
-                                new Driver("Driver-"+id, id)
-                        ));
-            }
-            return vehicles;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java 
b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index fa62b72..d313678 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -448,10 +448,11 @@ public class Stream implements IAggregatableStream {
 
     /**
      * This aggregator operation computes the minimum of tuples by the given 
{@code inputFieldName} and it is
-     * assumed that its value is an instance of {@code Comparable}.
+     * assumed that its value is an instance of {@code Comparable}. If the 
value of tuple with field {@code inputFieldName} is not an
+     * instance of {@code Comparable} then it throws {@code ClassCastException}
      *
      * @param inputFieldName input field name
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream minBy(String inputFieldName) {
         Aggregator<ComparisonAggregator.State> min = new Min(inputFieldName);
@@ -460,12 +461,13 @@ public class Stream implements IAggregatableStream {
 
     /**
      * This aggregator operation computes the minimum of tuples by the given 
{@code inputFieldName} in a stream by
-     * using the given {@code comparator}.
+     * using the given {@code comparator}. If the value of tuple with field 
{@code inputFieldName} is not an
+     * instance of {@code T} then it throws {@code ClassCastException}
      *
      * @param inputFieldName input field name
      * @param comparator comparator used in for finding minimum of two tuple 
values of {@code inputFieldName}.
      * @param <T> type of tuple's given input field value.
-     * @return
+     * @return the new stream with this operation.
      */
     public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) {
         Aggregator<ComparisonAggregator.State> min = new 
MinWithComparator<>(inputFieldName, comparator);
@@ -477,7 +479,7 @@ public class Stream implements IAggregatableStream {
      * {@code TridentTuple}s.
      *
      * @param comparator comparator used in for finding minimum of two tuple 
values.
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream min(Comparator<TridentTuple> comparator) {
         Aggregator<ComparisonAggregator.State> min = new 
MinWithComparator<>(comparator);
@@ -486,10 +488,11 @@ public class Stream implements IAggregatableStream {
 
     /**
      * This aggregator operation computes the maximum of tuples by the given 
{@code inputFieldName} and it is
-     * assumed that its value is an instance of {@code Comparable}.
+     * assumed that its value is an instance of {@code Comparable}. If the 
value of tuple with field {@code inputFieldName} is not an
+     * instance of {@code Comparable} then it throws {@code ClassCastException}
      *
      * @param inputFieldName input field name
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream maxBy(String inputFieldName) {
         Aggregator<ComparisonAggregator.State> max = new Max(inputFieldName);
@@ -498,12 +501,13 @@ public class Stream implements IAggregatableStream {
 
     /**
      * This aggregator operation computes the maximum of tuples by the given 
{@code inputFieldName} in a stream by
-     * using the given {@code comparator}.
+     * using the given {@code comparator}. If the value of tuple with field 
{@code inputFieldName} is not an
+     * instance of {@code T} then it throws {@code ClassCastException}
      *
      * @param inputFieldName input field name
      * @param comparator comparator used in for finding maximum of two tuple 
values of {@code inputFieldName}.
      * @param <T> type of tuple's given input field value.
-     * @return
+     * @return the new stream with this operation.
      */
     public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) {
         Aggregator<ComparisonAggregator.State> max = new 
MaxWithComparator<>(inputFieldName, comparator);
@@ -515,7 +519,7 @@ public class Stream implements IAggregatableStream {
      * {@code TridentTuple}s.
      *
      * @param comparator comparator used in for finding maximum of two tuple 
values.
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream max(Comparator<TridentTuple> comparator) {
         Aggregator<ComparisonAggregator.State> max = new 
MaxWithComparator<>(comparator);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
index 0109bb5..82b657a 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
@@ -21,12 +21,16 @@ package org.apache.storm.trident.operation.builtin;
 import org.apache.storm.trident.operation.BaseAggregator;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract {@code Aggregator} for comparing two values in a stream.
  *
  */
 public abstract class ComparisonAggregator<T> extends 
BaseAggregator<ComparisonAggregator.State> {
+    private static final Logger log = 
LoggerFactory.getLogger(ComparisonAggregator.class);
+    private Object batchId;
 
     public static class State {
         TridentTuple previousTuple;
@@ -42,6 +46,8 @@ public abstract class ComparisonAggregator<T> extends 
BaseAggregator<ComparisonA
 
     @Override
     public State init(Object batchId, TridentCollector collector) {
+        this.batchId = batchId;
+        log.debug("Started comparison aggregation for batch: [{}] in operation 
[{}]", batchId, this);
         return new State();
     }
 
@@ -50,6 +56,8 @@ public abstract class ComparisonAggregator<T> extends 
BaseAggregator<ComparisonA
         T value1 = valueFromTuple(state.previousTuple);
         T value2 = valueFromTuple(tuple);
 
+        log.debug("Aggregated tuple value in state [{}], and received tuple 
value [{}] in operation [{}]", value1, value2, this);
+
         if(value2 == null) {
             return;
         }
@@ -62,11 +70,22 @@ public abstract class ComparisonAggregator<T> extends 
BaseAggregator<ComparisonA
 
     protected T valueFromTuple(TridentTuple tuple) {
         // when there is no input field then the whole tuple is considered for 
comparison.
-        return (T) (inputFieldName != null && tuple != null ? 
tuple.getValueByField(inputFieldName) : tuple);
+        Object value = null;
+        if (inputFieldName != null && tuple != null) {
+            value =  tuple.getValueByField(inputFieldName);
+        } else {
+            value = tuple;
+        }
+
+        log.debug("value from tuple is [{}] with input field [{}] and tuple 
[{}]", value, inputFieldName, tuple);
+
+        return (T) value;
     }
 
     @Override
     public void complete(State state, TridentCollector collector) {
-        collector.emit(state.previousTuple.getValues());
+        log.debug("Completed comparison aggregation for batch [{}] with 
resultant tuple: [{}] in operation [{}]", batchId, state.previousTuple, this);
+
+        collector.emit(state.previousTuple != null ? 
state.previousTuple.getValues() : null);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
index 5385dfb..f1221b0 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
@@ -34,10 +34,4 @@ public class Max extends 
ComparisonAggregator<Comparable<Object>> {
         return value1.compareTo(value2) > 0 ? value1 : value2;
     }
 
-    /**
-     * Returns an aggregator computes the maximum of aggregated tuples in a 
stream. It assumes that the tuple has one value and
-     * it is an instance of {@code Comparable}.
-     *
-     * @return
-     */
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
index 172aa58..0e8ae90 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
@@ -41,4 +41,11 @@ public class MaxWithComparator<T> extends 
ComparisonAggregator<T> {
     protected T compare(T value1, T value2) {
         return comparator.compare(value1, value2) > 0 ? value1 : value2;
     }
+
+    @Override
+    public String toString() {
+        return "MaxWithComparator{" +
+                "comparator=" + comparator +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
index 0757d7c..010a919 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
@@ -33,12 +33,4 @@ public class Min extends 
ComparisonAggregator<Comparable<Object>> {
     protected Comparable<Object> compare(Comparable<Object> value1, 
Comparable<Object> value2) {
         return value1.compareTo(value2) < 0 ? value1 : value2;
     }
-
-    /**
-     * Returns an aggregator computes the maximum of aggregated tuples in a 
stream. It assumes that the tuple has one value and
-     * it is an instance of {@code Comparable}.
-     *
-     * @return
-     * @param inputFieldName
-     */
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
index d33e000..64144cb 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
@@ -41,4 +41,11 @@ public class MinWithComparator<T> extends 
ComparisonAggregator<T> {
     protected T compare(T value1, T value2) {
         return comparator.compare(value1, value2) < 0 ? value1 : value2;
     }
+
+    @Override
+    public String toString() {
+        return "MinWithComparator{" +
+                "comparator=" + comparator +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java 
b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
deleted file mode 100644
index a4a9a79..0000000
--- 
a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.trident.testing;
-
-import org.apache.storm.Config;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- *
- */
-public class NumberGeneratorSpout implements IBatchSpout {
-    private final Fields fields;
-    private final int batchSize;
-    private final int maxNumber;
-    private final Map<Long, List<List<Object>>> batches = new HashMap<>();
-
-    public NumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) {
-        this.fields = fields;
-        this.batchSize = batchSize;
-        this.maxNumber = maxNumber;
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context) {
-    }
-
-    @Override
-    public void emitBatch(long batchId, TridentCollector collector) {
-        List<List<Object>> values = null;
-        if(batches.containsKey(batchId)) {
-            values = batches.get(batchId);
-        } else {
-            values = new ArrayList<>();
-            for (int i = 0; i < batchSize; i++) {
-                values.add(Collections.singletonList((Object) 
ThreadLocalRandom.current().nextInt(0, maxNumber + 1)));
-            }
-            batches.put(batchId, values);
-        }
-        for (List<Object> value : values) {
-            collector.emit(value);
-        }
-    }
-
-    @Override
-    public void ack(long batchId) {
-        batches.remove(batchId);
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Config conf = new Config();
-        conf.setMaxTaskParallelism(1);
-        return conf;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return fields;
-    }
-}

Reply via email to