Addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc263cba Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc263cba Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc263cba Branch: refs/heads/master Commit: bc263cba67283b0c1ebe95be49e137fad86b3978 Parents: 5295a90 Author: Satish Duggana <[email protected]> Authored: Mon Feb 8 16:49:46 2016 +0530 Committer: Satish Duggana <[email protected]> Committed: Tue Feb 9 10:31:18 2016 +0530 ---------------------------------------------------------------------- .../spout/RandomNumberGeneratorSpout.java | 95 +++++++++ .../trident/TridentMinMaxOfDevicesTopology.java | 201 ++++++++++++++++++ .../TridentMinMaxOfVehiclesTopology.java | 180 ++++++++++++++++ .../TridentMinMaxOperationsTopology.java | 208 ------------------- .../jvm/org/apache/storm/trident/Stream.java | 24 ++- .../operation/builtin/ComparisonAggregator.java | 23 +- .../storm/trident/operation/builtin/Max.java | 6 - .../operation/builtin/MaxWithComparator.java | 7 + .../storm/trident/operation/builtin/Min.java | 8 - .../operation/builtin/MinWithComparator.java | 7 + .../trident/testing/NumberGeneratorSpout.java | 92 -------- 11 files changed, 525 insertions(+), 326 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java new file mode 100644 index 0000000..1d1b082 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.spout; + +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.spout.IBatchSpout; +import org.apache.storm.tuple.Fields; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +/** + * This spout generates random whole numbers with given {@code maxNumber} value as maximum with the given {@code fields}. + * + */ +public class RandomNumberGeneratorSpout implements IBatchSpout { + private final Fields fields; + private final int batchSize; + private final int maxNumber; + private final Map<Long, List<List<Object>>> batches = new HashMap<>(); + + public RandomNumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) { + this.fields = fields; + this.batchSize = batchSize; + this.maxNumber = maxNumber; + } + + @Override + public void open(Map conf, TopologyContext context) { + } + + @Override + public void emitBatch(long batchId, TridentCollector collector) { + List<List<Object>> values = null; + if(batches.containsKey(batchId)) { + values = batches.get(batchId); + } else { + values = new ArrayList<>(); + for (int i = 0; i < batchSize; i++) { + List<Object> numbers = new ArrayList<>(); + for (int x=0; x<fields.size(); x++) { + numbers.add(ThreadLocalRandom.current().nextInt(0, maxNumber + 1)); + } + values.add(numbers); + } + batches.put(batchId, values); + } + for (List<Object> value : values) { + collector.emit(value); + } + } + + @Override + public void ack(long batchId) { + batches.remove(batchId); + } + + @Override + public void close() { + + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Config conf = new Config(); + conf.setMaxTaskParallelism(1); + return conf; + } + + @Override + public Fields getOutputFields() { + return fields; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java new file mode 100644 index 0000000..d985436 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.starter.spout.RandomNumberGeneratorSpout; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * This class demonstrates different usages of + * * {@link Stream#minBy(String)} + * * {@link Stream#maxBy(String)} + * operations on trident {@link Stream}. + */ +public class TridentMinMaxOfDevicesTopology { + + /** + * Creates a topology with device-id and count (which are whole numbers) as tuple fields in a stream and it finally + * generates result stream based on min amd max with device-id and count values. + */ + public static StormTopology buildDevicesTopology() { + String deviceID = "device-id"; + String count = "count"; + Fields allFields = new Fields(deviceID, count); + + RandomNumberGeneratorSpout spout = new RandomNumberGeneratorSpout(allFields, 10, 1000); + + TridentTopology topology = new TridentTopology(); + Stream devicesStream = topology.newStream("devicegen-spout", spout). + each(allFields, new Debug("##### devices")); + + devicesStream.minBy(deviceID). + each(allFields, new Debug("#### device with min id")); + + devicesStream.maxBy(count). + each(allFields, new Debug("#### device with max count")); + + return topology.build(); + } + + /** + * Creates a topology which demonstrates min/max operations on tuples of stream which contain vehicle and driver fields + * with values {@link TridentMinMaxOfDevicesTopology.Vehicle} and {@link TridentMinMaxOfDevicesTopology.Driver} respectively. + */ + public static StormTopology buildVehiclesTopology() { + Fields driverField = new Fields(Driver.FIELD_NAME); + Fields vehicleField = new Fields(Vehicle.FIELD_NAME); + Fields allFields = new Fields(Vehicle.FIELD_NAME, Driver.FIELD_NAME); + + FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20)); + spout.setCycle(true); + + TridentTopology topology = new TridentTopology(); + Stream vehiclesStream = topology.newStream("spout1", spout). + each(allFields, new Debug("##### vehicles")); + + Stream slowVehiclesStream = + vehiclesStream + .min(new SpeedComparator()) + .each(vehicleField, new Debug("#### slowest vehicle")); + + Stream slowDriversStream = + slowVehiclesStream + .project(driverField) + .each(driverField, new Debug("##### slowest driver")); + + vehiclesStream + .max(new SpeedComparator()) + .each(vehicleField, new Debug("#### fastest vehicle")) + .project(driverField) + .each(driverField, new Debug("##### fastest driver")); + + vehiclesStream + .max(new EfficiencyComparator()). + each(vehicleField, new Debug("#### efficient vehicle")); + + return topology.build(); + } + + public static void main(String[] args) throws Exception { + + StormTopology topology = buildDevicesTopology(); + Config conf = new Config(); + conf.setMaxSpoutPending(20); + if (args.length == 0) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("devices-topology", conf, topology); + Utils.sleep(60 * 1000); + cluster.shutdown(); + System.exit(0); + } else { + conf.setNumWorkers(3); + StormSubmitter.submitTopologyWithProgressBar("devices-topology", conf, topology); + } + } + + static class SpeedComparator implements Comparator<TridentTuple>, Serializable { + + @Override + public int compare(TridentTuple tuple1, TridentTuple tuple2) { + Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME); + Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME); + return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed); + } + } + + static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable { + + @Override + public int compare(TridentTuple tuple1, TridentTuple tuple2) { + Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME); + Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME); + return Double.compare(vehicle1.efficiency, vehicle2.efficiency); + } + + } + + static class Driver implements Serializable { + static final String FIELD_NAME = "driver"; + final String name; + final int id; + + Driver(String name, int id) { + this.name = name; + this.id = id; + } + + @Override + public String toString() { + return "Driver{" + + "name='" + name + '\'' + + ", id=" + id + + '}'; + } + } + + static class Vehicle implements Serializable { + static final String FIELD_NAME = "vehicle"; + final String name; + final int maxSpeed; + final double efficiency; + + public Vehicle(String name, int maxSpeed, double efficiency) { + this.name = name; + this.maxSpeed = maxSpeed; + this.efficiency = efficiency; + } + + @Override + public String toString() { + return "Vehicle{" + + "name='" + name + '\'' + + ", maxSpeed=" + maxSpeed + + ", efficiency=" + efficiency + + '}'; + } + + public static List<Object>[] generateVehicles(int count) { + List<Object>[] vehicles = new List[count]; + for (int i = 0; i < count; i++) { + int id = i - 1; + vehicles[i] = + (new Values( + new Vehicle("Vehicle-" + id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)), + new Driver("Driver-" + id, id) + )); + } + return vehicles; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java new file mode 100644 index 0000000..192b198 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * This class demonstrates different usages of + * * {@link Stream#minBy(String, Comparator)} + * * {@link Stream#min(Comparator)} + * * {@link Stream#maxBy(String, Comparator)} + * * {@link Stream#max(Comparator)} + * operations on trident {@link Stream}. + */ +public class TridentMinMaxOfVehiclesTopology { + + /** + * Creates a topology which demonstrates min/max operations on tuples of stream which contain vehicle and driver fields + * with values {@link TridentMinMaxOfVehiclesTopology.Vehicle} and {@link TridentMinMaxOfVehiclesTopology.Driver} respectively. + */ + public static StormTopology buildVehiclesTopology() { + Fields driverField = new Fields(Driver.FIELD_NAME); + Fields vehicleField = new Fields(Vehicle.FIELD_NAME); + Fields allFields = new Fields(Vehicle.FIELD_NAME, Driver.FIELD_NAME); + + FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20)); + spout.setCycle(true); + + TridentTopology topology = new TridentTopology(); + Stream vehiclesStream = topology.newStream("spout1", spout). + each(allFields, new Debug("##### vehicles")); + + Stream slowVehiclesStream = + vehiclesStream + .min(new SpeedComparator()) + .each(vehicleField, new Debug("#### slowest vehicle")); + + Stream slowDriversStream = + slowVehiclesStream + .project(driverField) + .each(driverField, new Debug("##### slowest driver")); + + vehiclesStream + .max(new SpeedComparator()) + .each(vehicleField, new Debug("#### fastest vehicle")) + .project(driverField) + .each(driverField, new Debug("##### fastest driver")); + + vehiclesStream + .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()). + each(vehicleField, new Debug("#### least efficient vehicle")); + + vehiclesStream + .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()). + each(vehicleField, new Debug("#### most efficient vehicle")); + + return topology.build(); + } + + public static void main(String[] args) throws Exception { + + StormTopology topology = buildVehiclesTopology(); + Config conf = new Config(); + conf.setMaxSpoutPending(20); + if (args.length == 0) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("vehicles-topology", conf, topology); + Utils.sleep(60 * 1000); + cluster.shutdown(); + System.exit(0); + } else { + conf.setNumWorkers(3); + StormSubmitter.submitTopologyWithProgressBar("vehicles-topology", conf, topology); + } + } + + static class SpeedComparator implements Comparator<TridentTuple>, Serializable { + + @Override + public int compare(TridentTuple tuple1, TridentTuple tuple2) { + Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME); + Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME); + return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed); + } + } + + static class EfficiencyComparator implements Comparator<Vehicle>, Serializable { + + @Override + public int compare(Vehicle vehicle1, Vehicle vehicle2) { + return Double.compare(vehicle1.efficiency, vehicle2.efficiency); + } + + } + + static class Driver implements Serializable { + static final String FIELD_NAME = "driver"; + final String name; + final int id; + + Driver(String name, int id) { + this.name = name; + this.id = id; + } + + @Override + public String toString() { + return "Driver{" + + "name='" + name + '\'' + + ", id=" + id + + '}'; + } + } + + static class Vehicle implements Serializable { + static final String FIELD_NAME = "vehicle"; + final String name; + final int maxSpeed; + final double efficiency; + + public Vehicle(String name, int maxSpeed, double efficiency) { + this.name = name; + this.maxSpeed = maxSpeed; + this.efficiency = efficiency; + } + + @Override + public String toString() { + return "Vehicle{" + + "name='" + name + '\'' + + ", maxSpeed=" + maxSpeed + + ", efficiency=" + efficiency + + '}'; + } + + public static List<Object>[] generateVehicles(int count) { + List<Object>[] vehicles = new List[count]; + for (int i = 0; i < count; i++) { + int id = i - 1; + vehicles[i] = + (new Values( + new Vehicle("Vehicle-" + id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)), + new Driver("Driver-" + id, id) + )); + } + return vehicles; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java deleted file mode 100644 index dedaaff..0000000 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java +++ /dev/null @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.starter.trident; - -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.trident.Stream; -import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.operation.BaseFunction; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.operation.builtin.Debug; -import org.apache.storm.trident.testing.FixedBatchSpout; -import org.apache.storm.trident.testing.NumberGeneratorSpout; -import org.apache.storm.trident.tuple.TridentTuple; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; - -import java.io.Serializable; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - -/** - * This class contains different usages of minBy, maxBy, min and max operations on trident streams. - * - */ -public class TridentMinMaxOperationsTopology { - public static class Split extends BaseFunction { - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - String sentence = tuple.getString(0); - for (String word : sentence.split(" ")) { - collector.emit(new Values(word)); - } - } - } - - public static StormTopology buildIdsTopology() { - NumberGeneratorSpout spout = new NumberGeneratorSpout(new Fields("id"), 10, 1000); - - TridentTopology topology = new TridentTopology(); - Stream wordsStream = topology.newStream("numgen-spout", spout). - each(new Fields("id"), new Debug("##### ids")); - - wordsStream.minBy("id"). - each(new Fields("id"), new Debug("#### min-id")); - - wordsStream.maxBy("id"). - each(new Fields("id"), new Debug("#### max-id")); - - return topology.build(); - } - - public static StormTopology buildWordsTopology() { - FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), - new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), - new Values("how many apples can you eat"), new Values("to be or not to be the person")); - spout.setCycle(true); - - TridentTopology topology = new TridentTopology(); - Stream wordsStream = topology.newStream("spout1", spout).parallelismHint(16). - each(new Fields("sentence"), new Split(), new Fields("word")). - each(new Fields("word"), new Debug("##### words")); - - wordsStream.minBy("word"). - each(new Fields("word"), new Debug("#### lowest word")); - - wordsStream.maxBy("word"). - each(new Fields("word"), new Debug("#### highest word")); - - return topology.build(); - } - - public static StormTopology buildVehiclesTopology() { - - FixedBatchSpout spout = new FixedBatchSpout(new Fields("vehicle", "driver"), 10, Vehicle.generateVehicles(20)); - spout.setCycle(true); - - TridentTopology topology = new TridentTopology(); - Stream vehiclesStream = topology.newStream("spout1", spout). - each(new Fields("vehicle"), new Debug("##### vehicles")); - - vehiclesStream.min(new SpeedComparator()) - .each(new Fields("vehicle"), new Debug("#### slowest vehicle")) - .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### slowest driver")); - - vehiclesStream.max(new SpeedComparator()) - .each(new Fields("vehicle"), new Debug("#### fastest vehicle")) - .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### fastest driver")); - - vehiclesStream.max(new EfficiencyComparator()). - each(new Fields("vehicle"), new Debug("#### efficient vehicle")); - - return topology.build(); - } - - public static void main(String[] args) throws Exception { - Config conf = new Config(); - conf.setMaxSpoutPending(20); - StormTopology[] topologies = {buildWordsTopology(), buildIdsTopology(), buildVehiclesTopology()}; - if (args.length == 0) { - for (StormTopology topology : topologies) { - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("min-max-topology", conf, topology); - Utils.sleep(60*1000); - cluster.shutdown(); - } - System.exit(0); - } else { - conf.setNumWorkers(3); - int ct=1; - for (StormTopology topology : topologies) { - StormSubmitter.submitTopologyWithProgressBar(args[0]+"-"+ct++, conf, topology); - } - } - } - - static class SpeedComparator implements Comparator<TridentTuple>, Serializable { - - @Override - public int compare(TridentTuple tuple1, TridentTuple tuple2) { - Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle"); - Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle"); - return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed); - } - } - - static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable { - - @Override - public int compare(TridentTuple tuple1, TridentTuple tuple2) { - Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle"); - Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle"); - return Double.compare(vehicle1.efficiency, vehicle2.efficiency); - } - - } - - static class Driver implements Serializable { - final String name; - final int id; - - Driver(String name, int id) { - this.name = name; - this.id = id; - } - - @Override - public String toString() { - return "Driver{" + - "name='" + name + '\'' + - ", id=" + id + - '}'; - } - } - - static class Vehicle implements Serializable { - final String name; - final int maxSpeed; - final double efficiency; - - public Vehicle(String name, int maxSpeed, double efficiency) { - this.name = name; - this.maxSpeed = maxSpeed; - this.efficiency = efficiency; - } - - @Override - public String toString() { - return "Vehicle{" + - "name='" + name + '\'' + - ", maxSpeed=" + maxSpeed + - ", efficiency=" + efficiency + - '}'; - } - - public static List<Object>[] generateVehicles(int count) { - List<Object>[] vehicles = new List[count]; - for(int i=0; i<count; i++) { - int id = i-1; - vehicles[i] = - (new Values( - new Vehicle("Vehicle-"+id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)), - new Driver("Driver-"+id, id) - )); - } - return vehicles; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/Stream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java index fa62b72..d313678 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java +++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java @@ -448,10 +448,11 @@ public class Stream implements IAggregatableStream { /** * This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} and it is - * assumed that its value is an instance of {@code Comparable}. + * assumed that its value is an instance of {@code Comparable}. If the value of tuple with field {@code inputFieldName} is not an + * instance of {@code Comparable} then it throws {@code ClassCastException} * * @param inputFieldName input field name - * @return + * @return the new stream with this operation. */ public Stream minBy(String inputFieldName) { Aggregator<ComparisonAggregator.State> min = new Min(inputFieldName); @@ -460,12 +461,13 @@ public class Stream implements IAggregatableStream { /** * This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} in a stream by - * using the given {@code comparator}. + * using the given {@code comparator}. If the value of tuple with field {@code inputFieldName} is not an + * instance of {@code T} then it throws {@code ClassCastException} * * @param inputFieldName input field name * @param comparator comparator used in for finding minimum of two tuple values of {@code inputFieldName}. * @param <T> type of tuple's given input field value. - * @return + * @return the new stream with this operation. */ public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) { Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(inputFieldName, comparator); @@ -477,7 +479,7 @@ public class Stream implements IAggregatableStream { * {@code TridentTuple}s. * * @param comparator comparator used in for finding minimum of two tuple values. - * @return + * @return the new stream with this operation. */ public Stream min(Comparator<TridentTuple> comparator) { Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(comparator); @@ -486,10 +488,11 @@ public class Stream implements IAggregatableStream { /** * This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} and it is - * assumed that its value is an instance of {@code Comparable}. + * assumed that its value is an instance of {@code Comparable}. If the value of tuple with field {@code inputFieldName} is not an + * instance of {@code Comparable} then it throws {@code ClassCastException} * * @param inputFieldName input field name - * @return + * @return the new stream with this operation. */ public Stream maxBy(String inputFieldName) { Aggregator<ComparisonAggregator.State> max = new Max(inputFieldName); @@ -498,12 +501,13 @@ public class Stream implements IAggregatableStream { /** * This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} in a stream by - * using the given {@code comparator}. + * using the given {@code comparator}. If the value of tuple with field {@code inputFieldName} is not an + * instance of {@code T} then it throws {@code ClassCastException} * * @param inputFieldName input field name * @param comparator comparator used in for finding maximum of two tuple values of {@code inputFieldName}. * @param <T> type of tuple's given input field value. - * @return + * @return the new stream with this operation. */ public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) { Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(inputFieldName, comparator); @@ -515,7 +519,7 @@ public class Stream implements IAggregatableStream { * {@code TridentTuple}s. * * @param comparator comparator used in for finding maximum of two tuple values. - * @return + * @return the new stream with this operation. */ public Stream max(Comparator<TridentTuple> comparator) { Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(comparator); http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java index 0109bb5..82b657a 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java @@ -21,12 +21,16 @@ package org.apache.storm.trident.operation.builtin; import org.apache.storm.trident.operation.BaseAggregator; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.tuple.TridentTuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Abstract {@code Aggregator} for comparing two values in a stream. * */ public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonAggregator.State> { + private static final Logger log = LoggerFactory.getLogger(ComparisonAggregator.class); + private Object batchId; public static class State { TridentTuple previousTuple; @@ -42,6 +46,8 @@ public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonA @Override public State init(Object batchId, TridentCollector collector) { + this.batchId = batchId; + log.debug("Started comparison aggregation for batch: [{}] in operation [{}]", batchId, this); return new State(); } @@ -50,6 +56,8 @@ public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonA T value1 = valueFromTuple(state.previousTuple); T value2 = valueFromTuple(tuple); + log.debug("Aggregated tuple value in state [{}], and received tuple value [{}] in operation [{}]", value1, value2, this); + if(value2 == null) { return; } @@ -62,11 +70,22 @@ public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonA protected T valueFromTuple(TridentTuple tuple) { // when there is no input field then the whole tuple is considered for comparison. - return (T) (inputFieldName != null && tuple != null ? tuple.getValueByField(inputFieldName) : tuple); + Object value = null; + if (inputFieldName != null && tuple != null) { + value = tuple.getValueByField(inputFieldName); + } else { + value = tuple; + } + + log.debug("value from tuple is [{}] with input field [{}] and tuple [{}]", value, inputFieldName, tuple); + + return (T) value; } @Override public void complete(State state, TridentCollector collector) { - collector.emit(state.previousTuple.getValues()); + log.debug("Completed comparison aggregation for batch [{}] with resultant tuple: [{}] in operation [{}]", batchId, state.previousTuple, this); + + collector.emit(state.previousTuple != null ? state.previousTuple.getValues() : null); } } http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java index 5385dfb..f1221b0 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java @@ -34,10 +34,4 @@ public class Max extends ComparisonAggregator<Comparable<Object>> { return value1.compareTo(value2) > 0 ? value1 : value2; } - /** - * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and - * it is an instance of {@code Comparable}. - * - * @return - */ } http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java index 172aa58..0e8ae90 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java @@ -41,4 +41,11 @@ public class MaxWithComparator<T> extends ComparisonAggregator<T> { protected T compare(T value1, T value2) { return comparator.compare(value1, value2) > 0 ? value1 : value2; } + + @Override + public String toString() { + return "MaxWithComparator{" + + "comparator=" + comparator + + '}'; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java index 0757d7c..010a919 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java @@ -33,12 +33,4 @@ public class Min extends ComparisonAggregator<Comparable<Object>> { protected Comparable<Object> compare(Comparable<Object> value1, Comparable<Object> value2) { return value1.compareTo(value2) < 0 ? value1 : value2; } - - /** - * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and - * it is an instance of {@code Comparable}. - * - * @return - * @param inputFieldName - */ } http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java index d33e000..64144cb 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java @@ -41,4 +41,11 @@ public class MinWithComparator<T> extends ComparisonAggregator<T> { protected T compare(T value1, T value2) { return comparator.compare(value1, value2) < 0 ? value1 : value2; } + + @Override + public String toString() { + return "MinWithComparator{" + + "comparator=" + comparator + + '}'; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java deleted file mode 100644 index a4a9a79..0000000 --- a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.storm.trident.testing; - -import org.apache.storm.Config; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IBatchSpout; -import org.apache.storm.tuple.Fields; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; - -/** - * - */ -public class NumberGeneratorSpout implements IBatchSpout { - private final Fields fields; - private final int batchSize; - private final int maxNumber; - private final Map<Long, List<List<Object>>> batches = new HashMap<>(); - - public NumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) { - this.fields = fields; - this.batchSize = batchSize; - this.maxNumber = maxNumber; - } - - @Override - public void open(Map conf, TopologyContext context) { - } - - @Override - public void emitBatch(long batchId, TridentCollector collector) { - List<List<Object>> values = null; - if(batches.containsKey(batchId)) { - values = batches.get(batchId); - } else { - values = new ArrayList<>(); - for (int i = 0; i < batchSize; i++) { - values.add(Collections.singletonList((Object) ThreadLocalRandom.current().nextInt(0, maxNumber + 1))); - } - batches.put(batchId, values); - } - for (List<Object> value : values) { - collector.emit(value); - } - } - - @Override - public void ack(long batchId) { - batches.remove(batchId); - } - - @Override - public void close() { - - } - - @Override - public Map<String, Object> getComponentConfiguration() { - Config conf = new Config(); - conf.setMaxTaskParallelism(1); - return conf; - } - - @Override - public Fields getOutputFields() { - return fields; - } -}
