http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java index d140f77..8f68dab 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java @@ -20,8 +20,6 @@ package com.datatorrent.demos.machinedata.operator; import java.io.Serializable; import java.math.BigDecimal; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.util.Collections; import java.util.List; import java.util.Map; @@ -33,14 +31,16 @@ import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.datatorrent.lib.codec.KryoSerializableStreamCodec; -import com.datatorrent.lib.util.KeyValPair; - -import com.datatorrent.api.*; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.StreamCodec; import com.datatorrent.common.util.BaseOperator; - -import com.datatorrent.demos.machinedata.data.*; +import com.datatorrent.demos.machinedata.data.MachineInfo; +import com.datatorrent.demos.machinedata.data.MachineKey; +import com.datatorrent.demos.machinedata.data.ResourceType; import com.datatorrent.demos.machinedata.util.DataTable; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.util.KeyValPair; /** * <p> @@ -179,13 +179,12 @@ public class CalculatorOperator extends BaseOperator { double val = (kthPercentile * sorted.size()) / 100.0; - if (val == (int) val) { + if (val == (int)val) { // Whole number - int idx = (int) val - 1; + int idx = (int)val - 1; return (sorted.get(idx) + sorted.get(idx + 1)) / 2.0; - } - else { - int idx = (int) Math.round(val) - 1; + } else { + int idx = (int)Math.round(val) - 1; return sorted.get(idx); } }
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java index 29f700f..bbfd547 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java @@ -18,18 +18,6 @@ */ package com.datatorrent.demos.machinedata.operator; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; - -import com.datatorrent.demos.machinedata.data.MachineInfo; -import com.datatorrent.demos.machinedata.data.MachineKey; -import com.datatorrent.demos.machinedata.data.AverageData; -import com.datatorrent.lib.util.KeyHashValPair; -import com.datatorrent.lib.util.KeyValPair; - -import com.google.common.collect.Maps; - import java.math.BigDecimal; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -38,6 +26,18 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import com.google.common.collect.Maps; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +import com.datatorrent.demos.machinedata.data.AverageData; +import com.datatorrent.demos.machinedata.data.MachineInfo; +import com.datatorrent.demos.machinedata.data.MachineKey; +import com.datatorrent.lib.util.KeyHashValPair; +import com.datatorrent.lib.util.KeyValPair; + /** * This class calculates the average for various resources across different devices for a given key * <p>MachineInfoAveragingOperator class.</p> @@ -184,7 +184,7 @@ public class MachineInfoAveragingOperator extends BaseOperator { StringBuilder sb = new StringBuilder(); if (key instanceof MachineKey) { - MachineKey mkey = (MachineKey) key; + MachineKey mkey = (MachineKey)key; Integer customer = mkey.getCustomer(); if (customer != null) { sb.append("customer: " + customer + "\n"); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java index 15e6f07..cb5fa5a 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java @@ -18,19 +18,18 @@ */ package com.datatorrent.demos.machinedata.operator; -import com.datatorrent.common.util.BaseOperator; +import java.util.HashMap; +import java.util.Map; + import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.demos.machinedata.data.MachineKey; -import com.datatorrent.demos.machinedata.data.MachineInfo; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.demos.machinedata.data.AverageData; +import com.datatorrent.demos.machinedata.data.MachineInfo; +import com.datatorrent.demos.machinedata.data.MachineKey; import com.datatorrent.lib.util.KeyHashValPair; - -import java.util.HashMap; -import java.util.Map; - /** * This class calculates the partial sum and count for tuples generated by upstream Operator * <p> MachineInfoAveragingPrerequisitesOperator class. </p> @@ -51,8 +50,6 @@ public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator MachineInfoAveragingUnifier unifier = new MachineInfoAveragingUnifier(); return unifier; } - - ; }; public transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>() @@ -66,8 +63,7 @@ public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator if (averageData == null) { averageData = new AverageData(tuple.getCpu(), tuple.getHdd(), tuple.getRam(), 1); sums.put(key, averageData); - } - else { + } else { averageData.setCpu(averageData.getCpu() + tuple.getCpu()); averageData.setRam(averageData.getRam() + tuple.getRam()); averageData.setHdd(averageData.getHdd() + tuple.getHdd()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java index 40995b2..e0b67f3 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java @@ -21,8 +21,8 @@ package com.datatorrent.demos.machinedata.operator; import java.util.HashMap; import java.util.Map; -import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator.Unifier; import com.datatorrent.demos.machinedata.data.AverageData; @@ -80,8 +80,7 @@ public class MachineInfoAveragingUnifier implements Unifier<KeyHashValPair<Machi AverageData tupleValue = arg0.getValue(); if (averageData == null) { sums.put(tupleKey, tupleValue); - } - else { + } else { averageData.setCpu(averageData.getCpu() + tupleValue.getCpu()); averageData.setRam(averageData.getRam() + tupleValue.getRam()); averageData.setHdd(averageData.getHdd() + tupleValue.getHdd()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java index 88ee35d..6c4256a 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java @@ -18,7 +18,11 @@ */ package com.datatorrent.demos.machinedata.util; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * Generate combinations of elements for the given array of elements. @@ -27,66 +31,71 @@ import java.util.*; * * @since 0.3.5 */ -public class Combinatorics<T> { +public class Combinatorics<T> +{ - private T[] values; - private int size = -1; - private List<T> result; - private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>(); - private int resultMapSize = 0; + private T[] values; + private int size = -1; + private List<T> result; + private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>(); + private int resultMapSize = 0; - /** - * Generates all possible combinations with all the sizes. - * - * @param values - */ - public Combinatorics(T[] values) { - this.values = values; - this.size = -1; - this.result = new ArrayList<>(); - } + /** + * Generates all possible combinations with all the sizes. + * + * @param values + */ + public Combinatorics(T[] values) + { + this.values = values; + this.size = -1; + this.result = new ArrayList<>(); + } - /** - * Generates all possible combinations with the given size. - * - * @param values - * @param size - */ - public Combinatorics(T[] values, int size) { - this.values = values; - this.size = size; - this.result = new ArrayList<>(); - } + /** + * Generates all possible combinations with the given size. + * + * @param values + * @param size + */ + public Combinatorics(T[] values, int size) + { + this.values = values; + this.size = size; + this.result = new ArrayList<>(); + } - public Map<Integer, List<T>> generate() { + public Map<Integer, List<T>> generate() + { - if (size == -1) { - size = values.length; - for (int i = 1; i <= size; i++) { - int[] tmp = new int[i]; - Arrays.fill(tmp, -1); - generateCombinations(0, 0, tmp); - } - } else { - int[] tmp = new int[size]; - Arrays.fill(tmp, -1); - generateCombinations(0, 0, tmp); - } - return resultMap; + if (size == -1) { + size = values.length; + for (int i = 1; i <= size; i++) { + int[] tmp = new int[i]; + Arrays.fill(tmp, -1); + generateCombinations(0, 0, tmp); + } + } else { + int[] tmp = new int[size]; + Arrays.fill(tmp, -1); + generateCombinations(0, 0, tmp); } + return resultMap; + } - public void generateCombinations(int start, int depth, int[] tmp) { - if (depth == tmp.length) { - for (int j = 0; j < depth; j++) { - result.add(values[tmp[j]]); - } - resultMap.put(++resultMapSize, result); - result = new ArrayList<>(); - return; - } - for (int i = start; i < values.length; i++) { - tmp[depth] = i; - generateCombinations(i + 1, depth + 1, tmp); - } + public void generateCombinations(int start, int depth, int[] tmp) + { + if (depth == tmp.length) { + for (int j = 0; j < depth; j++) { + result.add(values[tmp[j]]); + } + resultMap.put(++resultMapSize, result); + result = new ArrayList<>(); + return; + } + for (int i = start; i < values.length; i++) { + tmp[depth] = i; + generateCombinations(i + 1, depth + 1, tmp); } + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java index 8820400..f8f2d33 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java @@ -30,36 +30,46 @@ import com.google.common.collect.Maps; * * @since 0.3.5 */ -public class DataTable<R,C,E> { +public class DataTable<R,C,E> +{ - //machineKey, [cpu,ram,hdd] -> value - private final Map<R,Map<C,E>> table= Maps.newHashMap(); + //machineKey, [cpu,ram,hdd] -> value + private final Map<R,Map<C,E>> table = Maps.newHashMap(); - public boolean containsRow(R rowKey){ - return table.containsKey(rowKey); - } + public boolean containsRow(R rowKey) + { + return table.containsKey(rowKey); + } - public void put(R rowKey,C colKey, E entry){ - if(!containsRow(rowKey)){ - table.put(rowKey, Maps.<C,E>newHashMap()); - } - table.get(rowKey).put(colKey, entry); - } + public void put(R rowKey,C colKey, E entry) + { + if (!containsRow(rowKey)) { + table.put(rowKey, Maps.<C,E>newHashMap()); + } + table.get(rowKey).put(colKey, entry); + } - @Nullable public E get(R rowKey, C colKey){ - if(!containsRow(rowKey)) return null; - return table.get(rowKey).get(colKey); - } + @Nullable + public E get(R rowKey, C colKey) + { + if (!containsRow(rowKey)) { + return null; + } + return table.get(rowKey).get(colKey); + } - public Set<R> rowKeySet(){ - return table.keySet(); - } + public Set<R> rowKeySet() + { + return table.keySet(); + } - public void clear(){ - table.clear(); - } + public void clear() + { + table.clear(); + } - public Map<R,Map<C,E>> getTable(){ - return table; - } + public Map<R,Map<C,E>> getTable() + { + return table; + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java b/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java index 1a26bd1..0e397be 100644 --- a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java +++ b/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java @@ -18,6 +18,20 @@ */ package com.datatorrent.demos.machinedata; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; + import com.datatorrent.demos.machinedata.data.MachineInfo; import com.datatorrent.demos.machinedata.data.MachineKey; import com.datatorrent.demos.machinedata.data.ResourceType; @@ -26,20 +40,6 @@ import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.lib.util.TimeBucketKey; -import com.google.common.collect.ImmutableList; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.List; -import java.util.Map; - /** * @since 0.3.5 */ @@ -94,7 +94,7 @@ public class CalculatorOperatorTest Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); for (Object o : sortSink.collectedTuples) { LOG.debug(o.toString()); - KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>) o; + KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o; Assert.assertEquals("emitted value for 'cpu' was ", 2.0, keyValPair.getValue().get(ResourceType.CPU), 0); Assert.assertEquals("emitted value for 'hdd' was ", 1.0, keyValPair.getValue().get(ResourceType.HDD), 0); Assert.assertEquals("emitted value for 'ram' was ", 1.0, keyValPair.getValue().get(ResourceType.RAM), 0); @@ -132,7 +132,7 @@ public class CalculatorOperatorTest Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); for (Object o : sortSink.collectedTuples) { LOG.debug(o.toString()); - KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>) o; + KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o; Assert.assertEquals("emitted value for 'cpu' was ", getSD(ImmutableList.of(1, 2, 3)), keyValPair.getValue().get(ResourceType.CPU), 0); Assert.assertEquals("emitted value for 'hdd' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.HDD), 0); Assert.assertEquals("emitted value for 'ram' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.RAM), 0); @@ -184,7 +184,7 @@ public class CalculatorOperatorTest Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); for (Object o : sortSink.collectedTuples) { LOG.debug(o.toString()); - KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>) o; + KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o; Assert.assertEquals("emitted value for 'cpu' was ", 3, keyValPair.getValue().get(ResourceType.CPU), 0); Assert.assertEquals("emitted value for 'hdd' was ", 1, keyValPair.getValue().get(ResourceType.HDD), 0); Assert.assertEquals("emitted value for 'ram' was ", 1, keyValPair.getValue().get(ResourceType.RAM), 0); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java ---------------------------------------------------------------------- diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java index 9d9f31b..30d7281 100644 --- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java +++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java @@ -18,6 +18,18 @@ */ package com.datatorrent.demos.mobile; +import java.net.URI; +import java.util.Arrays; +import java.util.Map; +import java.util.Random; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.mutable.MutableLong; +import org.apache.commons.lang3.Range; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StatsListener; @@ -28,16 +40,6 @@ import com.datatorrent.lib.io.PubSubWebSocketInputOperator; import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner; import com.datatorrent.lib.testbench.RandomEventGenerator; -import org.apache.commons.lang.mutable.MutableLong; -import org.apache.commons.lang3.Range; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URI; -import java.util.Arrays; -import java.util.Map; -import java.util.Random; /** * Mobile Demo Application: http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java ---------------------------------------------------------------------- diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java index 3b1e49d..8964d84 100644 --- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java +++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java @@ -18,18 +18,20 @@ */ package com.datatorrent.demos.mobile; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.google.common.collect.Maps; -import com.google.common.collect.Range; +import java.util.Map; +import java.util.Random; +import javax.validation.constraints.Min; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.validation.constraints.Min; -import java.util.Map; -import java.util.Random; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; /** * Generates mobile numbers that will be displayed in mobile demo just after launch.<br></br> @@ -99,7 +101,8 @@ public class PhoneEntryOperator extends BaseOperator public final transient DefaultOutputPort<Map<String, String>> seedPhones = new DefaultOutputPort<Map<String, String>>(); @Override - public void beginWindow(long windowId){ + public void beginWindow(long windowId) + { if (!seedGenerationDone) { Random random = new Random(); int maxPhone = (maxSeedPhoneNumber <= rangeUpperEndpoint && maxSeedPhoneNumber >= rangeLowerEndpoint) ? maxSeedPhoneNumber : rangeUpperEndpoint; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java ---------------------------------------------------------------------- diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java index 8db74cd..a46e6d4 100644 --- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java +++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java @@ -25,20 +25,20 @@ import java.util.Set; import javax.validation.constraints.Min; -import org.apache.commons.lang.mutable.MutableLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.mutable.MutableLong; + import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.InputPortFieldAnnotation; - +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.counters.BasicCounters; import com.datatorrent.lib.util.HighLow; @@ -73,8 +73,7 @@ public class PhoneMovementGenerator extends BaseOperator if (delta >= threshold) { if (state < 2) { xloc++; - } - else { + } else { xloc--; } if (xloc < 0) { @@ -85,8 +84,7 @@ public class PhoneMovementGenerator extends BaseOperator if (delta >= threshold) { if ((state == 1) || (state == 3)) { yloc++; - } - else { + } else { yloc--; } if (yloc < 0) { @@ -100,8 +98,7 @@ public class PhoneMovementGenerator extends BaseOperator HighLow<Integer> nloc = newgps.get(tuple); if (nloc == null) { newgps.put(tuple, new HighLow<Integer>(xloc, yloc)); - } - else { + } else { nloc.setHigh(xloc); nloc.setLow(yloc); } @@ -109,7 +106,7 @@ public class PhoneMovementGenerator extends BaseOperator } }; - @InputPortFieldAnnotation(optional=true) + @InputPortFieldAnnotation(optional = true) public final transient DefaultInputPort<Map<String,String>> phoneQuery = new DefaultInputPort<Map<String,String>>() { @Override @@ -120,19 +117,16 @@ public class PhoneMovementGenerator extends BaseOperator if (command != null) { if (command.equals(COMMAND_ADD)) { commandCounters.getCounter(CommandCounters.ADD).increment(); - String phoneStr= tuple.get(KEY_PHONE); + String phoneStr = tuple.get(KEY_PHONE); registerPhone(phoneStr); - } - else if (command.equals(COMMAND_ADD_RANGE)) { + } else if (command.equals(COMMAND_ADD_RANGE)) { commandCounters.getCounter(CommandCounters.ADD_RANGE).increment(); registerPhoneRange(tuple.get(KEY_START_PHONE), tuple.get(KEY_END_PHONE)); - } - else if (command.equals(COMMAND_DELETE)) { + } else if (command.equals(COMMAND_DELETE)) { commandCounters.getCounter(CommandCounters.DELETE).increment(); - String phoneStr= tuple.get(KEY_PHONE); + String phoneStr = tuple.get(KEY_PHONE); deregisterPhone(phoneStr); - } - else if (command.equals(COMMAND_CLEAR)) { + } else if (command.equals(COMMAND_CLEAR)) { commandCounters.getCounter(CommandCounters.CLEAR).increment(); clearPhones(); } @@ -181,7 +175,7 @@ public class PhoneMovementGenerator extends BaseOperator /** * Sets the range of phone numbers for which the GPS locations need to be generated. - * + * * @param i the range of phone numbers to set */ public void setRange(int i) @@ -190,7 +184,7 @@ public class PhoneMovementGenerator extends BaseOperator } /** - * @return the threshold + * @return the threshold */ @Min(0) public int getThreshold() @@ -200,7 +194,7 @@ public class PhoneMovementGenerator extends BaseOperator /** * Sets the threshold that decides how frequently the GPS locations are updated. - * + * * @param i the value that decides how frequently the GPS locations change. */ public void setThreshold(int i) @@ -217,8 +211,7 @@ public class PhoneMovementGenerator extends BaseOperator try { Integer phone = new Integer(phoneStr); registerSinglePhone(phone); - } - catch (NumberFormatException nfe) { + } catch (NumberFormatException nfe) { LOG.warn("Invalid no {}", phoneStr); } } @@ -239,8 +232,7 @@ public class PhoneMovementGenerator extends BaseOperator for (int i = startPhone; i <= endPhone; i++) { registerSinglePhone(i); } - } - catch (NumberFormatException nfe) { + } catch (NumberFormatException nfe) { LOG.warn("Invalid phone range <{},{}>", startPhoneStr, endPhoneStr); } } @@ -265,13 +257,13 @@ public class PhoneMovementGenerator extends BaseOperator LOG.debug("Removing query id {}", phone); emitPhoneRemoved(phone); } - } - catch (NumberFormatException nfe) { + } catch (NumberFormatException nfe) { LOG.warn("Invalid phone {}", phoneStr); } } - private void clearPhones() { + private void clearPhones() + { phoneRegister.clear(); LOG.info("Clearing phones"); } @@ -298,8 +290,7 @@ public class PhoneMovementGenerator extends BaseOperator HighLow<Integer> loc = gps.get(e.getKey()); if (loc == null) { gps.put(e.getKey(), e.getValue()); - } - else { + } else { loc.setHigh(e.getValue().getHigh()); loc.setLow(e.getValue().getLow()); } @@ -316,7 +307,8 @@ public class PhoneMovementGenerator extends BaseOperator context.setCounters(commandCounters); } - private void emitQueryResult(Integer phone) { + private void emitQueryResult(Integer phone) + { HighLow<Integer> loc = gps.get(phone); if (loc != null) { Map<String, String> queryResult = new HashMap<String, String>(); @@ -328,7 +320,7 @@ public class PhoneMovementGenerator extends BaseOperator private void emitPhoneRemoved(Integer phone) { - Map<String,String> removedResult= Maps.newHashMap(); + Map<String,String> removedResult = Maps.newHashMap(); removedResult.put(KEY_PHONE, String.valueOf(phone)); removedResult.put(KEY_REMOVED,"true"); locationQueryResult.emit(removedResult); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java index 72d6514..87e40bf 100644 --- a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java +++ b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java @@ -35,13 +35,13 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet; import com.datatorrent.lib.io.PubSubWebSocketInputOperator; import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.api.LocalMode; - public class ApplicationTest { private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); @@ -65,7 +65,7 @@ public class ApplicationTest contextHandler.addServlet(sh, "/pubsub"); contextHandler.addServlet(sh, "/*"); server.start(); - Connector connector[] = server.getConnectors(); + Connector[] connector = server.getConnectors(); conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort()); URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub"); @@ -111,7 +111,7 @@ public class ApplicationTest server.stop(); Assert.assertTrue("size of output is 5 ", sink.collectedTuples.size() == 5); for (Object obj : sink.collectedTuples) { - Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>) obj).get("phone")); + Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>)obj).get("phone")); } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java index 245d9c4..5625439 100644 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java +++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java @@ -20,20 +20,19 @@ package com.datatorrent.demos.mrmonitor; import org.apache.hadoop.conf.Configuration; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.testbench.SeedEventGenerator; - import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.SeedEventGenerator; /** * Application * * @since 2.0.0 */ -@ApplicationAnnotation(name="MyFirstApplication") +@ApplicationAnnotation(name = "MyFirstApplication") public class Application implements StreamingApplication { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java index 2f3d651..7930405 100644 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java +++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java @@ -26,23 +26,23 @@ package com.datatorrent.demos.mrmonitor; public interface Constants { - public final static int MAX_NUMBER_OF_JOBS = 25; + public static final int MAX_NUMBER_OF_JOBS = 25; - public final static String REDUCE_TASK_TYPE = "REDUCE"; - public final static String MAP_TASK_TYPE = "MAP"; - public final static String TASK_TYPE = "type"; - public final static String TASK_ID = "id"; + public static final String REDUCE_TASK_TYPE = "REDUCE"; + public static final String MAP_TASK_TYPE = "MAP"; + public static final String TASK_TYPE = "type"; + public static final String TASK_ID = "id"; - public final static String LEAGACY_TASK_ID = "taskId"; - public final static int MAX_TASKS = 2000; + public static final String LEAGACY_TASK_ID = "taskId"; + public static final int MAX_TASKS = 2000; - public final static String QUERY_APP_ID = "app_id"; - public final static String QUERY_JOB_ID = "job_id"; - public final static String QUERY_HADOOP_VERSION = "hadoop_version"; - public final static String QUERY_API_VERSION = "api_version"; - public final static String QUERY_RM_PORT = "rm_port"; - public final static String QUERY_HS_PORT = "hs_port"; - public final static String QUERY_HOST_NAME = "hostname"; + public static final String QUERY_APP_ID = "app_id"; + public static final String QUERY_JOB_ID = "job_id"; + public static final String QUERY_HADOOP_VERSION = "hadoop_version"; + public static final String QUERY_API_VERSION = "api_version"; + public static final String QUERY_RM_PORT = "rm_port"; + public static final String QUERY_HS_PORT = "hs_port"; + public static final String QUERY_HOST_NAME = "hostname"; public static final String QUERY_KEY_COMMAND = "command"; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java index 88863e2..263a1a7 100644 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java +++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java @@ -18,7 +18,11 @@ */ package com.datatorrent.demos.mrmonitor; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; import org.codehaus.jettison.json.JSONArray; @@ -107,8 +111,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler outputJsonObject.put("id", mrStatusObj.getJobId()); outputJsonObject.put("removed", "true"); output.emit(outputJsonObject.toString()); - } - catch (JSONException e) { + } catch (JSONException e) { LOG.warn("Error creating JSON: {}", e.getMessage()); } return; @@ -123,8 +126,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler } if (mrStatusObj.getHadoopVersion() == 2) { getJsonForJob(mrStatusObj); - } - else if (mrStatusObj.getHadoopVersion() == 1) { + } else if (mrStatusObj.getHadoopVersion() == 1) { getJsonForLegacyJob(mrStatusObj); } mrStatusObj.setStatusHistoryCount(statusHistoryTime); @@ -204,8 +206,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler if (jsonObj != null) { if (statusObj.getMetricObject() == null) { statusObj.setMetricObject(new TaskObject(jsonObj)); - } - else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) { + } else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) { statusObj.getMetricObject().setJson(jsonObj); statusObj.getMetricObject().setModified(true); } @@ -252,8 +253,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler continue; } reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj)); - } - else { + } else { if (mapTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) { TaskObject tempTaskObj = mapTaskOject.get(taskObj.getString(Constants.TASK_ID)); if (tempTaskObj.getJsonString().equals(taskObj.toString())) { @@ -269,8 +269,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler } statusObj.setMapJsonObject(mapTaskOject); statusObj.setReduceJsonObject(reduceTaskOject); - } - catch (Exception e) { + } catch (Exception e) { LOG.info("exception: {}", e.getMessage()); } } @@ -324,12 +323,11 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler { try { JSONObject jobJson = statusObj.getJsonObject(); - int totalTasks = ((JSONObject) ((JSONObject) jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks"); + int totalTasks = ((JSONObject)((JSONObject)jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks"); Map<String, TaskObject> taskMap; if (type.equalsIgnoreCase("map")) { taskMap = statusObj.getMapJsonObject(); - } - else { + } else { taskMap = statusObj.getReduceJsonObject(); } @@ -371,12 +369,10 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler if (type.equalsIgnoreCase("map")) { statusObj.setMapJsonObject(taskMap); - } - else { + } else { statusObj.setReduceJsonObject(taskMap); } - } - catch (Exception e) { + } catch (Exception e) { LOG.info(e.getMessage()); } @@ -387,8 +383,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler { try { Thread.sleep(sleepTime);// - } - catch (InterruptedException ie) { + } catch (InterruptedException ie) { // If this thread was intrrupted by nother thread } if (!iterator.hasNext()) { @@ -399,8 +394,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler MRStatusObject obj = iterator.next(); if (obj.getHadoopVersion() == 2) { getJsonForJob(obj); - } - else if (obj.getHadoopVersion() == 1) { + } else if (obj.getHadoopVersion() == 1) { getJsonForLegacyJob(obj); } } @@ -465,8 +459,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler outputJsonObject.put("tasks", arr); reduceOutput.emit(outputJsonObject.toString()); obj.setRetrials(0); - } - catch (Exception e) { + } catch (Exception e) { LOG.warn("error creating json {}", e.getMessage()); } @@ -543,17 +536,14 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler if (!modified) { if (obj.getRetrials() >= maxRetrials) { delList.add(obj.getJobId()); - } - else { + } else { obj.setRetrials(obj.getRetrials() + 1); } - } - else { + } else { obj.setRetrials(0); } } - } - catch (Exception ex) { + } catch (Exception ex) { LOG.warn("error creating json {}", ex.getMessage()); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java index 5758ad1..037378a 100644 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java +++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java @@ -20,10 +20,11 @@ package com.datatorrent.demos.mrmonitor; import java.net.URI; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java index f0471f3..481f3dc 100644 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java +++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java @@ -149,7 +149,8 @@ public class MRStatusObject virtualMemoryStatusHistory = new LinkedList<String>(); cpuStatusHistory = new LinkedList<String>(); statusScheduler = Executors.newScheduledThreadPool(1); - statusScheduler.scheduleAtFixedRate(new Runnable() { + statusScheduler.scheduleAtFixedRate(new Runnable() + { @Override public void run() { @@ -333,12 +334,15 @@ public class MRStatusObject @Override public boolean equals(Object that) { - if (this == that) + if (this == that) { return true; - if (!(that instanceof MRStatusObject)) + } + if (!(that instanceof MRStatusObject)) { return false; - if (this.hashCode() == that.hashCode()) + } + if (this.hashCode() == that.hashCode()) { return true; + } return false; } @@ -443,7 +447,7 @@ public class MRStatusObject /** * This returns the task information as json - * + * * @return */ public JSONObject getJson() @@ -453,7 +457,7 @@ public class MRStatusObject /** * This stores the task information as json - * + * * @param json */ public void setJson(JSONObject json) @@ -463,7 +467,7 @@ public class MRStatusObject /** * This returns if the json object has been modified - * + * * @return */ public boolean isModified() @@ -473,7 +477,7 @@ public class MRStatusObject /** * This sets if the json object is modified - * + * * @param modified */ public void setModified(boolean modified) @@ -483,7 +487,7 @@ public class MRStatusObject /** * This returns the string format of the json object - * + * * @return */ public String getJsonString() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java index cb10347..0d7f6af 100644 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java +++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java @@ -20,15 +20,16 @@ package com.datatorrent.demos.mrmonitor; import java.io.IOException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpClient; import org.apache.http.client.ResponseHandler; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.BasicResponseHandler; import org.apache.http.impl.client.DefaultHttpClient; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * <p> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java index e37454f..5075163 100644 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java +++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java @@ -33,7 +33,8 @@ import com.datatorrent.api.Operator; public class MapToMRObjectOperator implements Operator { - public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>() { + public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>() + { @Override public void process(Map<String, String> tuple) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java b/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java index 70cf840..ad8de02 100644 --- a/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java +++ b/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java @@ -28,9 +28,8 @@ import org.junit.Test; import org.apache.hadoop.conf.Configuration; -import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet; - import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet; /** * <p>MapReduceDebuggerApplicationTest class.</p> @@ -53,7 +52,7 @@ public class MrMonitoringApplicationTest contextHandler.addServlet(sh, "/pubsub"); contextHandler.addServlet(sh, "/*"); server.start(); - Connector connector[] = server.getConnectors(); + Connector[] connector = server.getConnectors(); conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort()); MRMonitoringApplication application = new MRMonitoringApplication(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java index 8ecf76e..5dbd83f 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java @@ -18,11 +18,11 @@ */ package com.datatorrent.demos.mroperator; -import java.text.SimpleDateFormat; -import java.util.Date; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; import org.apache.hadoop.io.WritableComparable; @@ -33,44 +33,48 @@ import org.apache.hadoop.io.WritableComparable; */ public class DateWritable implements WritableComparable<DateWritable> { - private final static SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" ); - private Date date; + private static final SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" ); + private Date date; + + public Date getDate() + { + return date; + } + + public void setDate( Date date ) + { + this.date = date; + } - public Date getDate() - { - return date; - } + public void readFields( DataInput in ) throws IOException + { + date = new Date( in.readLong() ); + } - public void setDate( Date date ) - { - this.date = date; - } + public void write( DataOutput out ) throws IOException + { + out.writeLong( date.getTime() ); + } - public void readFields( DataInput in ) throws IOException - { - date = new Date( in.readLong() ); - } + @Override + public boolean equals(Object o) + { + return toString().equals(o.toString()); + } - public void write( DataOutput out ) throws IOException - { - out.writeLong( date.getTime() ); - } + @Override + public int hashCode() + { + return toString().hashCode(); + } - @Override - public boolean equals(Object o){ - return toString().equals(o.toString()); - } - @Override - public int hashCode(){ - return toString().hashCode(); - } - public String toString() - { - return formatter.format( date); - } + public String toString() + { + return formatter.format( date); + } - public int compareTo( DateWritable other ) - { - return date.compareTo( other.getDate() ); - } + public int compareTo( DateWritable other ) + { + return date.compareTo( other.getDate() ); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java index b6b9735..c4b9c49 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java @@ -36,6 +36,6 @@ public class HdfsKeyValOutputOperator<K, V> extends AbstractSingleFileOutputOper @Override public byte[] getBytesForTuple(KeyHashValPair<K,V> t) { - return (t.toString()+"\n").getBytes(); + return (t.toString() + "\n").getBytes(); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java index dae07a2..076b8ac 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java @@ -29,7 +29,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; * * @since 0.9.0 */ -@ApplicationAnnotation(name="InvertedIndexDemo") +@ApplicationAnnotation(name = "InvertedIndexDemo") public class InvertedIndexApplication extends MapReduceApplication<LongWritable, Text, Text, Text> { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java index aabea81..e963954 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java @@ -41,18 +41,18 @@ import org.apache.hadoop.mapred.Reporter; * * @since 0.9.0 */ -public class LineIndexer { +public class LineIndexer +{ public static class LineIndexMapper extends MapReduceBase - implements Mapper<LongWritable, Text, Text, Text> { - - private final static Text word = new Text(); - private final static Text location = new Text(); + implements Mapper<LongWritable, Text, Text, Text> + { + private static final Text word = new Text(); + private static final Text location = new Text(); public void map(LongWritable key, Text val, - OutputCollector<Text, Text> output, Reporter reporter) - throws IOException { - + OutputCollector<Text, Text> output, Reporter reporter) throws IOException + { FileSplit fileSplit = (FileSplit)reporter.getInputSplit(); String fileName = fileSplit.getPath().getName(); location.set(fileName); @@ -69,18 +69,18 @@ public class LineIndexer { public static class LineIndexReducer extends MapReduceBase - implements Reducer<Text, Text, Text, Text> { - + implements Reducer<Text, Text, Text, Text> + { public void reduce(Text key, Iterator<Text> values, - OutputCollector<Text, Text> output, Reporter reporter) - throws IOException { - + OutputCollector<Text, Text> output, Reporter reporter) throws IOException + { boolean first = true; StringBuilder toReturn = new StringBuilder(); - while (values.hasNext()){ - if (!first) + while (values.hasNext()) { + if (!first) { toReturn.append(", "); - first=false; + } + first = false; toReturn.append(values.next().toString()); } @@ -93,7 +93,8 @@ public class LineIndexer { * The actual main() method for our program; this is the * "driver" for the MapReduce job. */ - public static void main(String[] args) { + public static void main(String[] args) + { JobClient client = new JobClient(); JobConf conf = new JobConf(LineIndexer.class); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java index 793ad4d..69ee892 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java @@ -18,201 +18,170 @@ */ package com.datatorrent.demos.mroperator; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; +import java.io.IOException; +import java.util.Calendar; +import java.util.Iterator; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; - -import java.io.IOException; -import java.util.Calendar; -import java.util.Iterator; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; /** * <p>LogCountsPerHour class.</p> * * @since 0.9.0 */ -public class LogCountsPerHour extends Configured implements Tool { +public class LogCountsPerHour extends Configured implements Tool +{ - public static class LogMapClass extends MapReduceBase - implements Mapper<LongWritable, Text, DateWritable, IntWritable> - { - private DateWritable date = new DateWritable(); - private final static IntWritable one = new IntWritable( 1 ); - - public void map( LongWritable key, // Offset into the file - Text value, - OutputCollector<DateWritable, IntWritable> output, - Reporter reporter) throws IOException - { - // Get the value as a String; it is of the format: - // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" - String text = value.toString(); - - // Get the date and time - int openBracket = text.indexOf( '[' ); - int closeBracket = text.indexOf( ']' ); - if( openBracket != -1 && closeBracket != -1 ) - { - // Read the date - String dateString = text.substring( text.indexOf( '[' ) + 1, text.indexOf( ']' ) ); - - // Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500 - int index = 0; - int nextIndex = dateString.indexOf( '/' ); - int day = Integer.parseInt( dateString.substring(index, nextIndex) ); - - index = nextIndex; - nextIndex = dateString.indexOf( '/', index+1 ); - String month = dateString.substring( index+1, nextIndex ); - - index = nextIndex; - nextIndex = dateString.indexOf( ':', index ); - int year = Integer.parseInt(dateString.substring(index + 1, nextIndex)); - - index = nextIndex; - nextIndex = dateString.indexOf( ':', index+1 ); - int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex)); - - // Build a calendar object for this date - Calendar calendar = Calendar.getInstance(); - calendar.set( Calendar.DATE, day ); - calendar.set( Calendar.YEAR, year ); - calendar.set( Calendar.HOUR, hour ); - calendar.set( Calendar.MINUTE, 0 ); - calendar.set( Calendar.SECOND, 0 ); - calendar.set( Calendar.MILLISECOND, 0 ); - - if( month.equalsIgnoreCase( "dec" ) ) - { - calendar.set( Calendar.MONTH, Calendar.DECEMBER ); - } - else if( month.equalsIgnoreCase( "nov" ) ) - { - calendar.set( Calendar.MONTH, Calendar.NOVEMBER ); - } - else if( month.equalsIgnoreCase( "oct" ) ) - { - calendar.set( Calendar.MONTH, Calendar.OCTOBER ); - } - else if( month.equalsIgnoreCase( "sep" ) ) - { - calendar.set( Calendar.MONTH, Calendar.SEPTEMBER ); - } - else if( month.equalsIgnoreCase( "aug" ) ) - { - calendar.set( Calendar.MONTH, Calendar.AUGUST ); - } - else if( month.equalsIgnoreCase( "jul" ) ) - { - calendar.set( Calendar.MONTH, Calendar.JULY ); - } - else if( month.equalsIgnoreCase( "jun" ) ) - { - calendar.set( Calendar.MONTH, Calendar.JUNE ); - } - else if( month.equalsIgnoreCase( "may" ) ) - { - calendar.set( Calendar.MONTH, Calendar.MAY ); - } - else if( month.equalsIgnoreCase( "apr" ) ) - { - calendar.set( Calendar.MONTH, Calendar.APRIL ); - } - else if( month.equalsIgnoreCase( "mar" ) ) - { - calendar.set( Calendar.MONTH, Calendar.MARCH ); - } - else if( month.equalsIgnoreCase( "feb" ) ) - { - calendar.set( Calendar.MONTH, Calendar.FEBRUARY ); - } - else if( month.equalsIgnoreCase( "jan" ) ) - { - calendar.set( Calendar.MONTH, Calendar.JANUARY ); - } - - - // Output the date as the key and 1 as the value - date.setDate( calendar.getTime() ); - output.collect(date, one); - } - } - } + public static class LogMapClass extends MapReduceBase + implements Mapper<LongWritable, Text, DateWritable, IntWritable> + { + private DateWritable date = new DateWritable(); + private static final IntWritable one = new IntWritable(1); - public static class LogReduce extends MapReduceBase - implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable> + public void map(LongWritable key, Text value, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException { - public void reduce( DateWritable key, Iterator<IntWritable> values, - OutputCollector<DateWritable, IntWritable> output, - Reporter reporter) throws IOException - { - // Iterate over all of the values (counts of occurrences of this word) - int count = 0; - while( values.hasNext() ) - { - // Add the value to our count - count += values.next().get(); - } - - // Output the word with its count (wrapped in an IntWritable) - output.collect( key, new IntWritable( count ) ); + // Get the value as a String; it is of the format: + // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" + String text = value.toString(); + + // Get the date and time + int openBracket = text.indexOf('['); + int closeBracket = text.indexOf(']'); + if (openBracket != -1 && closeBracket != -1) { + // Read the date + String dateString = text.substring(text.indexOf('[') + 1, text.indexOf(']')); + + // Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500 + int index = 0; + int nextIndex = dateString.indexOf('/'); + int day = Integer.parseInt(dateString.substring(index, nextIndex)); + + index = nextIndex; + nextIndex = dateString.indexOf('/', index + 1); + String month = dateString.substring(index + 1, nextIndex); + + index = nextIndex; + nextIndex = dateString.indexOf(':', index); + int year = Integer.parseInt(dateString.substring(index + 1, nextIndex)); + + index = nextIndex; + nextIndex = dateString.indexOf(':', index + 1); + int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex)); + + // Build a calendar object for this date + Calendar calendar = Calendar.getInstance(); + calendar.set(Calendar.DATE, day); + calendar.set(Calendar.YEAR, year); + calendar.set(Calendar.HOUR, hour); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + + if (month.equalsIgnoreCase("dec")) { + calendar.set(Calendar.MONTH, Calendar.DECEMBER); + } else if (month.equalsIgnoreCase("nov")) { + calendar.set(Calendar.MONTH, Calendar.NOVEMBER); + } else if (month.equalsIgnoreCase("oct")) { + calendar.set(Calendar.MONTH, Calendar.OCTOBER); + } else if (month.equalsIgnoreCase("sep")) { + calendar.set(Calendar.MONTH, Calendar.SEPTEMBER); + } else if (month.equalsIgnoreCase("aug")) { + calendar.set(Calendar.MONTH, Calendar.AUGUST); + } else if (month.equalsIgnoreCase("jul")) { + calendar.set(Calendar.MONTH, Calendar.JULY); + } else if (month.equalsIgnoreCase("jun")) { + calendar.set(Calendar.MONTH, Calendar.JUNE); + } else if (month.equalsIgnoreCase("may")) { + calendar.set(Calendar.MONTH, Calendar.MAY); + } else if (month.equalsIgnoreCase("apr")) { + calendar.set(Calendar.MONTH, Calendar.APRIL); + } else if (month.equalsIgnoreCase("mar")) { + calendar.set(Calendar.MONTH, Calendar.MARCH); + } else if (month.equalsIgnoreCase("feb")) { + calendar.set(Calendar.MONTH, Calendar.FEBRUARY); + } else if (month.equalsIgnoreCase("jan")) { + calendar.set(Calendar.MONTH, Calendar.JANUARY); } - } - public int run(String[] args) throws Exception - { - // Create a configuration - Configuration conf = getConf(); - - // Create a job from the default configuration that will use the WordCount class - JobConf job = new JobConf( conf, LogCountsPerHour.class ); - - // Define our input path as the first command line argument and our output path as the second - Path in = new Path( args[0] ); - Path out = new Path( args[1] ); - - // Create File Input/Output formats for these paths (in the job) - FileInputFormat.setInputPaths( job, in ); - FileOutputFormat.setOutputPath( job, out ); - - // Configure the job: name, mapper, reducer, and combiner - job.setJobName( "LogAveragePerHour" ); - job.setMapperClass( LogMapClass.class ); - job.setReducerClass( LogReduce.class ); - job.setCombinerClass( LogReduce.class ); - - // Configure the output - job.setOutputFormat( TextOutputFormat.class ); - job.setOutputKeyClass( DateWritable.class ); - job.setOutputValueClass( IntWritable.class ); - - // Run the job - JobClient.runJob(job); - return 0; + // Output the date as the key and 1 as the value + date.setDate(calendar.getTime()); + output.collect(date, one); + } } + } - public static void main(String[] args) throws Exception + public static class LogReduce extends MapReduceBase + implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable> + { + public void reduce(DateWritable key, Iterator<IntWritable> values, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException { - // Start the LogCountsPerHour MapReduce application - int res = ToolRunner.run( new Configuration(), - new LogCountsPerHour(), - args ); - System.exit( res ); + // Iterate over all of the values (counts of occurrences of this word) + int count = 0; + while (values.hasNext()) { + // Add the value to our count + count += values.next().get(); + } + + // Output the word with its count (wrapped in an IntWritable) + output.collect(key, new IntWritable(count)); } + } + + + public int run(String[] args) throws Exception + { + // Create a configuration + Configuration conf = getConf(); + + // Create a job from the default configuration that will use the WordCount class + JobConf job = new JobConf(conf, LogCountsPerHour.class); + + // Define our input path as the first command line argument and our output path as the second + Path in = new Path(args[0]); + Path out = new Path(args[1]); + + // Create File Input/Output formats for these paths (in the job) + FileInputFormat.setInputPaths(job, in); + FileOutputFormat.setOutputPath(job, out); + + // Configure the job: name, mapper, reducer, and combiner + job.setJobName("LogAveragePerHour"); + job.setMapperClass(LogMapClass.class); + job.setReducerClass(LogReduce.class); + job.setCombinerClass(LogReduce.class); + + // Configure the output + job.setOutputFormat(TextOutputFormat.class); + job.setOutputKeyClass(DateWritable.class); + job.setOutputValueClass(IntWritable.class); + + // Run the job + JobClient.runJob(job); + return 0; + } + + public static void main(String[] args) throws Exception + { + // Start the LogCountsPerHour MapReduce application + int res = ToolRunner.run(new Configuration(), new LogCountsPerHour(), args); + System.exit(res); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java index cbe5566..2d647ed 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java @@ -30,7 +30,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; * * @since 0.9.0 */ -@ApplicationAnnotation(name="LogsCountDemo") +@ApplicationAnnotation(name = "LogsCountDemo") public class LogsCountApplication extends MapReduceApplication<LongWritable, Text, DateWritable, IntWritable> { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java index b8023f5..509f6ae 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java @@ -22,18 +22,35 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import javax.validation.constraints.Min; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.io.serializer.Serializer; -import org.apache.hadoop.mapred.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.KeyValueTextInputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultOutputPort; @@ -123,8 +140,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< if (reader == null) { try { reader = inputFormat.getRecordReader(inputSplit, new JobConf(new Configuration()), reporter); - } - catch (IOException e) { + } catch (IOException e) { logger.info("error getting record reader {}", e.getMessage()); } } @@ -150,11 +166,10 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< SerializationFactory serializationFactory = new SerializationFactory(conf); Deserializer keyDesiralizer = serializationFactory.getDeserializer(inputSplitClass); keyDesiralizer.open(new ByteArrayInputStream(outstream.toByteArray())); - inputSplit = (InputSplit) keyDesiralizer.deserialize(null); - ((ReporterImpl) reporter).setInputSplit(inputSplit); + inputSplit = (InputSplit)keyDesiralizer.deserialize(null); + ((ReporterImpl)reporter).setInputSplit(inputSplit); reader = inputFormat.getRecordReader(inputSplit, new JobConf(conf), reporter); - } - catch (Exception e) { + } catch (Exception e) { logger.info("failed to initialize inputformat obj {}", inputFormat); throw new RuntimeException(e); } @@ -172,8 +187,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< if (mapClass != null) { try { mapObject = mapClass.newInstance(); - } - catch (Exception e) { + } catch (Exception e) { logger.info("can't instantiate object {}", e.getMessage()); } @@ -182,8 +196,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< if (combineClass != null) { try { combineObject = combineClass.newInstance(); - } - catch (Exception e) { + } catch (Exception e) { logger.info("can't instantiate object {}", e.getMessage()); } combineObject.configure(jobConf); @@ -202,15 +215,14 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< KeyHashValPair<K1, V1> keyValue = new KeyHashValPair<K1, V1>(key, val); mapObject.map(keyValue.getKey(), keyValue.getValue(), outputCollector, reporter); if (combineObject == null) { - List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>) outputCollector).getList(); + List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList(); for (KeyHashValPair<K2, V2> e : list) { output.emit(e); } list.clear(); } } - } - catch (IOException ex) { + } catch (IOException ex) { logger.debug(ex.toString()); throw new RuntimeException(ex); } @@ -220,7 +232,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< @Override public void endWindow() { - List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>) outputCollector).getList(); + List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList(); if (combineObject != null) { Map<K2, List<V2>> cacheObject = new HashMap<K2, List<V2>>(); for (KeyHashValPair<K2, V2> tuple : list) { @@ -229,8 +241,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< cacheList = new ArrayList<V2>(); cacheList.add(tuple.getValue()); cacheObject.put(tuple.getKey(), cacheList); - } - else { + } else { cacheList.add(tuple.getValue()); } } @@ -239,12 +250,11 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< for (Map.Entry<K2, List<V2>> e : cacheObject.entrySet()) { try { combineObject.reduce(e.getKey(), e.getValue().iterator(), tempOutputCollector, reporter); - } - catch (IOException e1) { + } catch (IOException e1) { logger.info(e1.getMessage()); } } - list = ((OutputCollectorImpl<K2, V2>) tempOutputCollector).getList(); + list = ((OutputCollectorImpl<K2, V2>)tempOutputCollector).getList(); for (KeyHashValPair<K2, V2> e : list) { output.emit(e); } @@ -261,14 +271,13 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< { FileInputFormat.setInputPaths(conf, new Path(path)); if (inputFormat == null) { - inputFormat = inputFormatClass.newInstance(); - String inputFormatClassName = inputFormatClass.getName(); - if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) { - ((TextInputFormat) inputFormat).configure(conf); - } - else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) { - ((KeyValueTextInputFormat) inputFormat).configure(conf); - } + inputFormat = inputFormatClass.newInstance(); + String inputFormatClassName = inputFormatClass.getName(); + if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) { + ((TextInputFormat)inputFormat).configure(conf); + } else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) { + ((KeyValueTextInputFormat)inputFormat).configure(conf); + } } return inputFormat.getSplits(conf, numSplits); // return null; @@ -296,8 +305,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< InputSplit[] splits; try { splits = getSplits(new JobConf(conf), tempPartitionCount, template.getPartitionedInstance().getDirName()); - } - catch (Exception e1) { + } catch (Exception e1) { logger.info(" can't get splits {}", e1.getMessage()); throw new RuntimeException(e1); } @@ -316,8 +324,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< keySerializer.open(opr.getOutstream()); keySerializer.serialize(splits[size - 1]); opr.setInputSplitClass(splits[size - 1].getClass()); - } - catch (IOException e) { + } catch (IOException e) { logger.info("error while serializing {}", e.getMessage()); } size--; @@ -333,8 +340,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< keySerializer.open(opr.getOutstream()); keySerializer.serialize(splits[size - 1]); opr.setInputSplitClass(splits[size - 1].getClass()); - } - catch (IOException e) { + } catch (IOException e) { logger.info("error while serializing {}", e.getMessage()); } size--; @@ -342,8 +348,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner< } try { keySerializer.close(); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } return operList; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java index 45f9005..b0ea7d8 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java @@ -30,15 +30,15 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; * * @since 0.9.0 */ -@ApplicationAnnotation(name="WordCountDemo") -public class NewWordCountApplication extends MapReduceApplication<LongWritable, Text, Text, IntWritable> { - - public void NewWordCountApplication() { - setMapClass(WordCount.Map.class); - setReduceClass(WordCount.Reduce.class); - setCombineClass(WordCount.Reduce.class); - setInputFormat(TextInputFormat.class); - - } +@ApplicationAnnotation(name = "WordCountDemo") +public class NewWordCountApplication extends MapReduceApplication<LongWritable, Text, Text, IntWritable> +{ + public void NewWordCountApplication() + { + setMapClass(WordCount.Map.class); + setReduceClass(WordCount.Reduce.class); + setCombineClass(WordCount.Reduce.class); + setInputFormat(TextInputFormat.class); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java index b380553..6c81724 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java @@ -24,14 +24,14 @@ import java.io.PipedOutputStream; import java.util.ArrayList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.io.serializer.Serializer; import org.apache.hadoop.mapred.OutputCollector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.datatorrent.lib.util.KeyHashValPair; /** @@ -40,50 +40,55 @@ import com.datatorrent.lib.util.KeyHashValPair; * @since 0.9.0 */ @SuppressWarnings("unchecked") -public class OutputCollectorImpl<K extends Object, V extends Object> implements OutputCollector<K, V> { - private static final Logger logger = LoggerFactory.getLogger(OutputCollectorImpl.class); +public class OutputCollectorImpl<K extends Object, V extends Object> implements OutputCollector<K, V> +{ + private static final Logger logger = LoggerFactory.getLogger(OutputCollectorImpl.class); - private List<KeyHashValPair<K, V>> list = new ArrayList<KeyHashValPair<K, V>>(); + private List<KeyHashValPair<K, V>> list = new ArrayList<KeyHashValPair<K, V>>(); - public List<KeyHashValPair<K, V>> getList() { - return list; - } + public List<KeyHashValPair<K, V>> getList() + { + return list; + } - private transient SerializationFactory serializationFactory; - private transient Configuration conf = null; + private transient SerializationFactory serializationFactory; + private transient Configuration conf = null; - public OutputCollectorImpl() { - conf = new Configuration(); - serializationFactory = new SerializationFactory(conf); + public OutputCollectorImpl() + { + conf = new Configuration(); + serializationFactory = new SerializationFactory(conf); - } + } - private <T> T cloneObj(T t) throws IOException { - Serializer<T> keySerializer; - Class<T> keyClass; - PipedInputStream pis = new PipedInputStream(); - PipedOutputStream pos = new PipedOutputStream(pis); - keyClass = (Class<T>) t.getClass(); - keySerializer = serializationFactory.getSerializer(keyClass); - keySerializer.open(pos); - keySerializer.serialize(t); - Deserializer<T> keyDesiralizer = serializationFactory.getDeserializer(keyClass); - keyDesiralizer.open(pis); - T clonedArg0 = keyDesiralizer.deserialize(null); - pos.close(); - pis.close(); - keySerializer.close(); - keyDesiralizer.close(); - return clonedArg0; + private <T> T cloneObj(T t) throws IOException + { + Serializer<T> keySerializer; + Class<T> keyClass; + PipedInputStream pis = new PipedInputStream(); + PipedOutputStream pos = new PipedOutputStream(pis); + keyClass = (Class<T>)t.getClass(); + keySerializer = serializationFactory.getSerializer(keyClass); + keySerializer.open(pos); + keySerializer.serialize(t); + Deserializer<T> keyDesiralizer = serializationFactory.getDeserializer(keyClass); + keyDesiralizer.open(pis); + T clonedArg0 = keyDesiralizer.deserialize(null); + pos.close(); + pis.close(); + keySerializer.close(); + keyDesiralizer.close(); + return clonedArg0; - } + } - @Override - public void collect(K arg0, V arg1) throws IOException { - if (conf == null) { - conf = new Configuration(); - serializationFactory = new SerializationFactory(conf); - } - list.add(new KeyHashValPair<K, V>(cloneObj(arg0), cloneObj(arg1))); - } + @Override + public void collect(K arg0, V arg1) throws IOException + { + if (conf == null) { + conf = new Configuration(); + serializationFactory = new SerializationFactory(conf); + } + list.add(new KeyHashValPair<K, V>(cloneObj(arg0), cloneObj(arg1))); + } }
