http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java index 6a6777e..c8a0e51 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java @@ -57,7 +57,7 @@ import static org.apache.apex.malhar.stream.api.Option.Options.name; public class WindowedWordCount implements StreamingApplication { static final int WINDOW_SIZE = 1; // Default window duration in minutes - + /** * A input operator that reads from and output a file line by line to downstream with a time gap between * every two lines. @@ -65,23 +65,23 @@ public class WindowedWordCount implements StreamingApplication public static class TextInput extends BaseOperator implements InputOperator { private static boolean done = false; - + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); - + private transient BufferedReader reader; - + public static boolean isDone() { return done; } - + @Override public void setup(Context.OperatorContext context) { done = false; initReader(); } - + private void initReader() { try { @@ -91,13 +91,13 @@ public class WindowedWordCount implements StreamingApplication throw Throwables.propagate(ex); } } - + @Override public void teardown() { IOUtils.closeQuietly(reader); } - + @Override public void emitTuples() { @@ -118,16 +118,16 @@ public class WindowedWordCount implements StreamingApplication } } } - + public static class Collector extends BaseOperator { private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>(); - + public static Map<KeyValPair<Long, String>, Long> getResult() { return result; } - + public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>() { @Override @@ -137,7 +137,7 @@ public class WindowedWordCount implements StreamingApplication } }; } - + /** * A Pojo Tuple class used for outputting result to JDBC. */ @@ -146,44 +146,44 @@ public class WindowedWordCount implements StreamingApplication private String word; private long count; private long timestamp; - + @Override public String toString() { return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")"; } - + public String getWord() { return word; } - + public void setWord(String word) { this.word = word; } - + public long getCount() { return count; } - + public void setCount(long count) { this.count = count; } - + public long getTimestamp() { return timestamp; } - + public void setTimestamp(long timestamp) { this.timestamp = timestamp; } } - + /** * A map function that wrap the input string with a random generated timestamp. */ @@ -191,12 +191,12 @@ public class WindowedWordCount implements StreamingApplication { private static final Duration RAND_RANGE = Duration.standardMinutes(10); private final Long minTimestamp; - + AddTimestampFn() { this.minTimestamp = System.currentTimeMillis(); } - + @Override public Tuple.TimestampedTuple<String> f(String input) { @@ -207,7 +207,7 @@ public class WindowedWordCount implements StreamingApplication return new Tuple.TimestampedTuple<>(randomTimestamp, input); } } - + /** A MapFunction that converts a Word and Count into a PojoEvent. */ public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent> { @@ -221,7 +221,7 @@ public class WindowedWordCount implements StreamingApplication return row; } } - + /** * Populate dag with High-Level API. * @param dag @@ -232,10 +232,10 @@ public class WindowedWordCount implements StreamingApplication { TextInput input = new TextInput(); Collector collector = new Collector(); - + // Create stream from the TextInput operator. ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input")) - + // Extract all the words from the input line of text. .flatMap(new Function.FlatMapFunction<String, String>() { @@ -245,18 +245,18 @@ public class WindowedWordCount implements StreamingApplication return Arrays.asList(input.split("[\\p{Punct}\\s]+")); } }, name("ExtractWords")) - + // Wrap the word with a randomly generated timestamp. .map(new AddTimestampFn(), name("AddTimestampFn")); - - + + // apply window and trigger option. // TODO: change trigger option to atWaterMark when available. WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)); - - + + WindowedStream<PojoEvent> wordCounts = // Perform a countByKey transformation to count the appearance of each word in every time window. windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>() @@ -268,10 +268,10 @@ public class WindowedWordCount implements StreamingApplication new KeyValPair<String, Long>(input.getValue(), 1L)); } }, name("count words")) - + // Format the output and print out the result. .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(); - + wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag); } }
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java index 29c8cf9..00c40e7 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java @@ -79,12 +79,12 @@ public class AutoComplete implements StreamingApplication public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); private transient BufferedReader reader; - + public static boolean isDone() { return done; } - + @Override public void setup(OperatorContext context) { @@ -128,16 +128,16 @@ public class AutoComplete implements StreamingApplication } } } - + public static class Collector extends BaseOperator { private static Map<String, List<CompletionCandidate>> result = new HashMap<>(); - + public static Map<String, List<CompletionCandidate>> getResult() { return result; } - + public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>() { @Override @@ -193,7 +193,7 @@ public class AutoComplete implements StreamingApplication @Override public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) { - // TODO: Should be removed after Auto-wrapping is supported. + // TODO: Should be removed after Auto-wrapping is supported. return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple); } }); @@ -271,7 +271,8 @@ public class AutoComplete implements StreamingApplication { return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)); } - }, name("countByKey")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() + }, name("countByKey")) + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() { @Override public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) @@ -300,7 +301,7 @@ public class AutoComplete implements StreamingApplication ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler")) .flatMap(new ExtractHashtags()); - + tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) .addCompositeStreams(ComputeTopCompletions.top(10, true)).endWith(collector, collector.input, name("collector")) .populateDag(dag); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java index 8a7113e..5531b5e 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java @@ -45,7 +45,7 @@ public class CompletionCandidate implements Comparable<CompletionCandidate> // Empty constructor required for Kryo. public CompletionCandidate() { - + } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java index 2a4c003..e7eb90c 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java @@ -24,18 +24,18 @@ package org.apache.apex.malhar.stream.sample.complete; public class PojoEvent extends Object { private String stringValue; - + @Override public String toString() { return "PojoEvent [stringValue=" + getStringValue() + "]"; } - + public void setStringValue(String newString) { this.stringValue = newString; } - + public String getStringValue() { return this.stringValue; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java index 2ffdc82..845901a 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java @@ -45,17 +45,17 @@ public class StreamingWordExtract implements StreamingApplication { private static int wordCount = 0; // A counter to count number of words have been extracted. private static int entriesMapped = 0; // A counter to count number of entries have been mapped. - + public int getWordCount() { return wordCount; } - + public int getEntriesMapped() { return entriesMapped; } - + /** * A MapFunction that tokenizes lines of text into individual words. */ @@ -69,8 +69,8 @@ public class StreamingWordExtract implements StreamingApplication return result; } } - - + + /** * A MapFunction that uppercases a word. */ @@ -82,8 +82,8 @@ public class StreamingWordExtract implements StreamingApplication return input.toUpperCase(); } } - - + + /** * A filter function to filter out empty strings. */ @@ -95,14 +95,14 @@ public class StreamingWordExtract implements StreamingApplication return !input.isEmpty(); } } - - + + /** * A map function to map the result string to a pojo entry. */ public static class PojoMapper implements Function.MapFunction<String, Object> { - + @Override public Object f(String input) { @@ -112,7 +112,7 @@ public class StreamingWordExtract implements StreamingApplication return pojo; } } - + /** * Add field infos to the {@link JdbcPOJOInsertOutputOperator}. */ @@ -122,7 +122,7 @@ public class StreamingWordExtract implements StreamingApplication fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR)); return fieldInfos; } - + /** * Populate dag with High-Level API. * @param dag @@ -136,25 +136,25 @@ public class StreamingWordExtract implements StreamingApplication JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); jdbcOutput.setStore(outputStore); jdbcOutput.setTablename("TestTable"); - + // Create a stream reading from a folder. ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data"); // Extract all the words from the input line of text. stream.flatMap(new ExtractWords()) - + // Filter out the empty strings. .filter(new EmptyStringFilter()) - + // Change every word to uppercase. .map(new Uppercase()) - + // Map the resulted word to a Pojo entry. .map(new PojoMapper()) - + // Output the entries to JdbcOutput and insert them into a table. .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput")); - + stream.populateDag(dag); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java index f2e70b1..d7d62fe 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java @@ -63,23 +63,23 @@ public class TopWikipediaSessions implements StreamingApplication { private String[] names = new String[]{"user1", "user2", "user3", "user4"}; public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>(); - + private static final Duration RAND_RANGE = Duration.standardDays(365); private Long minTimestamp; private long sleepTime; private static int tupleCount = 0; - + public static int getTupleCount() { return tupleCount; } - + private String randomName(String[] names) { int index = new Random().nextInt(names.length); return names[index]; } - + @Override public void setup(Context.OperatorContext context) { @@ -88,7 +88,7 @@ public class TopWikipediaSessions implements StreamingApplication minTimestamp = System.currentTimeMillis(); sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS); } - + @Override public void emitTuples() { @@ -103,17 +103,17 @@ public class TopWikipediaSessions implements StreamingApplication } } } - + public static class Collector extends BaseOperator { private final int resultSize = 5; private static List<List<TempWrapper>> result = new ArrayList<>(); - + public static List<List<TempWrapper>> getResult() { return result; } - + public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>() { @Override @@ -126,8 +126,8 @@ public class TopWikipediaSessions implements StreamingApplication } }; } - - + + /** * Convert the upstream (user, time) combination to a timestamped tuple of user. */ @@ -138,13 +138,13 @@ public class TopWikipediaSessions implements StreamingApplication { long timestamp = input.getValue(); String userName = input.getKey(); - + // Sets the implicit timestamp field to be used in windowing. return new Tuple.TimestampedTuple<>(timestamp, userName); - + } } - + /** * Computes the number of edits in each user session. A session is defined as * a string of edits where each is separated from the next by less than an hour. @@ -156,10 +156,10 @@ public class TopWikipediaSessions implements StreamingApplication public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream) { return inputStream - + // Chuck the stream into session windows. .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) - + // Count the number of edits for a user within one session. .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>() { @@ -171,7 +171,7 @@ public class TopWikipediaSessions implements StreamingApplication }, name("ComputeSessions")); } } - + /** * A comparator class used for comparing two TempWrapper objects. */ @@ -183,7 +183,7 @@ public class TopWikipediaSessions implements StreamingApplication return Long.compare(o1.getValue().getValue(), o2.getValue().getValue()); } } - + /** * A function to extract timestamp from a TempWrapper object. */ @@ -196,7 +196,7 @@ public class TopWikipediaSessions implements StreamingApplication return input.getTimestamp(); } } - + /** * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can @@ -206,39 +206,39 @@ public class TopWikipediaSessions implements StreamingApplication { private KeyValPair<String, Long> value; private Long timestamp; - + public TempWrapper() { - + } - + public TempWrapper(KeyValPair<String, Long> value, Long timestamp) { this.value = value; this.timestamp = timestamp; } - + @Override public String toString() { return this.value + " - " + this.timestamp; } - + public Long getTimestamp() { return timestamp; } - + public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } - + public KeyValPair<String, Long> getValue() { return value; } - + public void setValue(KeyValPair<String, Long> value) { this.value = value; @@ -251,16 +251,16 @@ public class TopWikipediaSessions implements StreamingApplication private static class TopPerMonth extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>> { - + @Override public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream) { TopN<TempWrapper> topN = new TopN<>(); topN.setN(10); topN.setComparator(new Comp()); - + return inputStream - + // Map the input WindowedTuple to a TempWrapper object. .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>() { @@ -270,15 +270,15 @@ public class TopWikipediaSessions implements StreamingApplication return new TempWrapper(input.getValue(), input.getWindows().get(0).getBeginTimestamp()); } }, name("TempWrapper")) - + // Apply window and trigger option again, this time chuck the stream into fixed time windows. .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5))) - + // Compute the top 10 user-sessions with most number of edits. .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor()); } } - + /** * A map function that combine the user and his/her edit session together to a string and use that string as a key * with number of edits in that session as value to create a new key value pair to send to downstream. @@ -293,7 +293,7 @@ public class TopWikipediaSessions implements StreamingApplication input.getValue().getValue())); } } - + /** * A flapmap function that turns the result into readable format. */ @@ -311,7 +311,7 @@ public class TopWikipediaSessions implements StreamingApplication return result; } } - + /** * A composite transform that compute the top wikipedia sessions. */ @@ -327,7 +327,7 @@ public class TopWikipediaSessions implements StreamingApplication .addCompositeStreams(new TopPerMonth()); } } - + @Override public void populateDAG(DAG dag, Configuration conf) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java index 26a2823..3045238 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java @@ -63,7 +63,7 @@ public class TrafficRoutes implements StreamingApplication static Map<String, String> sdStations = buildStationInfo(); static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes - + /** * This class holds information about a station reading's average speed. */ @@ -75,54 +75,54 @@ public class TrafficRoutes implements StreamingApplication Double avgSpeed; @Nullable Long timestamp; - + public StationSpeed() {} - + public StationSpeed(String stationId, Double avgSpeed, Long timestamp) { this.stationId = stationId; this.avgSpeed = avgSpeed; this.timestamp = timestamp; } - + public void setAvgSpeed(@Nullable Double avgSpeed) { this.avgSpeed = avgSpeed; } - + public void setStationId(@Nullable String stationId) { this.stationId = stationId; } - + public void setTimestamp(@Nullable Long timestamp) { this.timestamp = timestamp; } - + @Nullable public Long getTimestamp() { return timestamp; } - + public String getStationId() { return this.stationId; } - + public Double getAvgSpeed() { return this.avgSpeed; } - + @Override public int compareTo(StationSpeed other) { return Long.compare(this.timestamp, other.timestamp); } } - + /** * This class holds information about a route's speed/slowdown. */ @@ -134,63 +134,63 @@ public class TrafficRoutes implements StreamingApplication Double avgSpeed; @Nullable Boolean slowdownEvent; - + public RouteInfo() { - + } - + public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) { this.route = route; this.avgSpeed = avgSpeed; this.slowdownEvent = slowdownEvent; } - + public String getRoute() { return this.route; } - + public Double getAvgSpeed() { return this.avgSpeed; } - + public Boolean getSlowdownEvent() { return this.slowdownEvent; } } - + /** * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple} * with the extracted timestamp. */ static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>> { - + @Override public Tuple.TimestampedTuple<String> f(String input) { String[] items = input.split(","); String timestamp = tryParseTimestamp(items); - + return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input); } } - + /** * Filter out readings for the stations along predefined 'routes', and output * (station, speed info) keyed on route. */ static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>> { - + @Override public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input) { - + ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>(); String[] items = input.getValue().split(","); String stationType = tryParseStationType(items); @@ -210,7 +210,7 @@ public class TrafficRoutes implements StreamingApplication return result; } } - + /** * For a given route, track average speed for the window. Calculate whether * traffic is currently slowing down, via a predefined threshold. If a supermajority of @@ -261,7 +261,7 @@ public class TrafficRoutes implements StreamingApplication return result; } } - + /** * Output Pojo class for outputting result to JDBC. */ @@ -271,11 +271,11 @@ public class TrafficRoutes implements StreamingApplication private Boolean slowdownEvent; private String key; private Long timestamp; - + public OutputPojo() { } - + public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp) { this.avgSpeed = avgSpeed; @@ -283,64 +283,64 @@ public class TrafficRoutes implements StreamingApplication this.key = key; this.timestamp = timestamp; } - + @Override public String toString() { return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp; } - + public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } - + public Long getTimestamp() { return timestamp; } - + public void setAvgSpeed(Double avgSpeed) { this.avgSpeed = avgSpeed; } - + public Double getAvgSpeed() { return avgSpeed; } - + public void setKey(String key) { this.key = key; } - + public String getKey() { return key; } - + public void setSlowdownEvent(Boolean slowdownEvent) { this.slowdownEvent = slowdownEvent; } - + public Boolean getSlowdownEvent() { return slowdownEvent; } - + } - + public static class Collector extends BaseOperator { private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>(); - + public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult() { return result; } - + public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>() { @Override @@ -350,7 +350,7 @@ public class TrafficRoutes implements StreamingApplication } }; } - + /** * Format the results of the slowdown calculations to a OutputPojo. */ @@ -364,8 +364,8 @@ public class TrafficRoutes implements StreamingApplication return row; } } - - + + /** * This composite transformation extracts speed info from traffic station readings. * It groups the readings by 'route' and analyzes traffic slowdown for that route. @@ -389,19 +389,19 @@ public class TrafficRoutes implements StreamingApplication return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input); } }, name("GroupByKey")); - + // Analyze 'slowdown' over the route readings. WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup .flatMap(new GatherStats(), name("GatherStats")); - + // Format the results for writing to JDBC table. WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn")); - + return results; } } - - + + private static Double tryParseAvgSpeed(String[] inputItems) { try { @@ -412,27 +412,27 @@ public class TrafficRoutes implements StreamingApplication return null; } } - + private static String tryParseStationType(String[] inputItems) { return tryParseString(inputItems, 2); } - + private static String tryParseStationId(String[] inputItems) { return tryParseString(inputItems, 1); } - + private static String tryParseTimestamp(String[] inputItems) { return tryParseString(inputItems, 0); } - + private static String tryParseString(String[] inputItems, int index) { return inputItems.length >= index ? inputItems[index] : null; } - + /** * Define some small hard-wired San Diego 'routes' to track based on sensor station ID. */ @@ -444,33 +444,33 @@ public class TrafficRoutes implements StreamingApplication stations.put("1108702", "SDRoute2"); return stations; } - + /** * A dummy generator to generate some traffic information. */ public static class InfoGen extends BaseOperator implements InputOperator { public transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); - + private String[] stationTypes = new String[]{"ML", "BL", "GL"}; private int[] stationIDs = new int[]{1108413, 1108699, 1108702}; private double ave = 55.0; private long timestamp; private static final Duration RAND_RANGE = Duration.standardMinutes(10); private static int tupleCount = 0; - + public static int getTupleCount() { return tupleCount; } - + @Override public void setup(Context.OperatorContext context) { tupleCount = 0; timestamp = System.currentTimeMillis(); } - + @Override public void emitTuples() { @@ -481,7 +481,7 @@ public class TrafficRoutes implements StreamingApplication try { output.emit(time + "," + stationID + "," + stationType + "," + speed); tupleCount++; - + Thread.sleep(50); } catch (Exception e) { // Ignore it @@ -490,29 +490,29 @@ public class TrafficRoutes implements StreamingApplication } } } - + @Override public void populateDAG(DAG dag, Configuration conf) { InfoGen infoGen = new InfoGen(); Collector collector = new Collector(); - + // Create a stream from the input operator. ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen")) - + // Extract the timestamp from the input and wrap it into a TimestampedTuple. .map(new ExtractTimestamps(), name("ExtractTimestamps")); - + stream // Extract the average speed of a station. .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn")) - + // Apply window and trigger option. .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes()) - + // Apply TrackSpeed composite transformation to compute the route information. .addCompositeStreams(new TrackSpeed()) - + // print the result to console. .print() .endWith(collector, collector.input, name("Collector")) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java index ecad622..a4fdf24 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java @@ -200,7 +200,8 @@ public class TwitterAutoComplete implements StreamingApplication { return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)); } - }, name("Hashtag Count")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() + }, name("Hashtag Count")) + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() { @Override public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java index d88a8dc..7c16521 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java @@ -94,7 +94,7 @@ public class CombinePerKeyExamples implements StreamingApplication return new SampleBean(input.getValue().getKey(), input.getValue().getValue()); } } - + /** * A reduce function to concat two strings together. */ @@ -106,7 +106,7 @@ public class CombinePerKeyExamples implements StreamingApplication return input1 + ", " + input2; } } - + /** * Reads the public 'Shakespeare' data, and for each word in the dataset * over a given length, generates a string containing the list of play names @@ -114,17 +114,17 @@ public class CombinePerKeyExamples implements StreamingApplication */ private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>> { - + @Override public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream) { return inputStream // Extract words from the input SampleBeam stream. .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn")) - + // Apply window and trigger option to the streams. .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) - + // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together. .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>() { @@ -134,13 +134,13 @@ public class CombinePerKeyExamples implements StreamingApplication return new Tuple.PlainTuple<KeyValPair<String, String>>(input); } }, name("Concat")) - + // Format the output back to a SampleBeam object. .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn")); } } - - + + /** * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare. */ @@ -157,13 +157,13 @@ public class CombinePerKeyExamples implements StreamingApplication this.word = word; this.corpus = corpus; } - + @Override public String toString() { return this.word + " : " + this.corpus; } - + private String word; private String corpus; @@ -188,7 +188,7 @@ public class CombinePerKeyExamples implements StreamingApplication return corpus; } } - + /** * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare' * data. @@ -200,19 +200,19 @@ public class CombinePerKeyExamples implements StreamingApplication private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"}; private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"}; private static int i; - + public static int getI() { return i; } - + @Override public void setup(Context.OperatorContext context) { super.setup(context); i = 0; } - + @Override public void emitTuples() { @@ -229,20 +229,20 @@ public class CombinePerKeyExamples implements StreamingApplication } i++; } - + } } - + public static class Collector extends BaseOperator { static List<SampleBean> result; - + @Override public void setup(Context.OperatorContext context) { result = new ArrayList<>(); } - + public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>() { @Override @@ -252,7 +252,7 @@ public class CombinePerKeyExamples implements StreamingApplication } }; } - + /** * Populate dag using High-Level API. * @param dag @@ -268,6 +268,6 @@ public class CombinePerKeyExamples implements StreamingApplication .print() .endWith(collector, collector.input, name("Collector")) .populateDag(dag); - + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java index 2930010..0cd7c58 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java @@ -47,22 +47,22 @@ import static org.apache.apex.malhar.stream.api.Option.Options.name; @ApplicationAnnotation(name = "DeDupExample") public class DeDupExample implements StreamingApplication { - + public static class Collector extends BaseOperator { private static Tuple.WindowedTuple<List<String>> result; private static boolean done = false; - + public static Tuple.WindowedTuple<List<String>> getResult() { return result; } - + public static boolean isDone() { return done; } - + @Override public void setup(Context.OperatorContext context) { @@ -70,7 +70,7 @@ public class DeDupExample implements StreamingApplication result = new Tuple.WindowedTuple<>(); done = false; } - + public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>() { @Override @@ -83,15 +83,15 @@ public class DeDupExample implements StreamingApplication } }; } - + @Override public void populateDAG(DAG dag, Configuration conf) { Collector collector = new Collector(); - + // Create a stream that reads from files in a local folder and output lines one by one to downstream. ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput")) - + // Extract all the words from the input line of text. .flatMap(new Function.FlatMapFunction<String, String>() { @@ -101,7 +101,7 @@ public class DeDupExample implements StreamingApplication return Arrays.asList(input.split("[\\p{Punct}\\s]+")); } }, name("ExtractWords")) - + // Change the words to lower case, also shutdown the app when the word "bye" is detected. .map(new Function.MapFunction<String, String>() { @@ -111,14 +111,14 @@ public class DeDupExample implements StreamingApplication return input.toLowerCase(); } }, name("ToLowerCase")); - + // Apply window and trigger option. stream.window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1))) - + // Remove the duplicate words and print out the result. .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")).print().endWith(collector, collector.input) - + .populateDag(dag); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java index 3643eab..1ba2a90 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java @@ -27,48 +27,48 @@ public class InputPojo extends Object private int day; private int year; private double meanTemp; - + @Override public String toString() { return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]"; } - + public void setMonth(int month) { this.month = month; } - + public int getMonth() { return this.month; } - + public void setDay(int day) { this.day = day; } - + public int getDay() { return day; } - + public void setYear(int year) { this.year = year; } - + public int getYear() { return year; } - + public void setMeanTemp(double meanTemp) { this.meanTemp = meanTemp; } - + public double getMeanTemp() { return meanTemp; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java index 02980e4..4538aef 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java @@ -55,7 +55,7 @@ import static org.apache.apex.malhar.stream.api.Option.Options.name; @ApplicationAnnotation(name = "MaxPerKeyExamples") public class MaxPerKeyExamples implements StreamingApplication { - + /** * A map function to extract the mean temperature from {@link InputPojo}. */ @@ -69,8 +69,8 @@ public class MaxPerKeyExamples implements StreamingApplication return new KeyValPair<Integer, Double>(month, meanTemp); } } - - + + /** * A map function to format output to {@link OutputPojo}. */ @@ -85,7 +85,7 @@ public class MaxPerKeyExamples implements StreamingApplication return row; } } - + /** * A composite transformation to perform three tasks: * 1. extract the month and its mean temperature from input pojo. @@ -99,7 +99,7 @@ public class MaxPerKeyExamples implements StreamingApplication { // InputPojo... => <month, meanTemp> ... WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn")); - + // month, meanTemp... => <month, max mean temp>... WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes = temps.accumulateByKey(new Max<Double>(), @@ -111,14 +111,14 @@ public class MaxPerKeyExamples implements StreamingApplication return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GLOBAL_WINDOW, input); } }, name("MaxPerMonth")); - + // <month, max>... => OutputPojo... WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn")); - + return results; } } - + /** * Method to set field info for {@link JdbcPOJOInputOperator}. * @return @@ -132,7 +132,7 @@ public class MaxPerKeyExamples implements StreamingApplication fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE)); return fieldInfos; } - + /** * Method to set field info for {@link JdbcPOJOInsertOutputOperator}. * @return @@ -144,8 +144,8 @@ public class MaxPerKeyExamples implements StreamingApplication fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE)); return fieldInfos; } - - + + /** * Populate the dag using High-Level API. * @param dag @@ -156,21 +156,21 @@ public class MaxPerKeyExamples implements StreamingApplication { JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator(); jdbcInput.setFieldInfos(addInputFieldInfos()); - + JdbcStore store = new JdbcStore(); jdbcInput.setStore(store); - + JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator(); jdbcOutput.setFieldInfos(addOutputFieldInfos()); JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); jdbcOutput.setStore(outputStore); - + // Create stream that reads from a Jdbc Input. ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput")) - + // Apply window and trigger option to the stream. .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) - + // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo. .map(new Function.MapFunction<Object, InputPojo>() { @@ -180,10 +180,10 @@ public class MaxPerKeyExamples implements StreamingApplication return (InputPojo)input; } }, name("ObjectToInputPojo")) - + // Plug in the composite transformation to the stream to calculate the maximum temperature for each month. .addCompositeStreams(new MaxMeanTemp()) - + // Cast the resulted OutputPojo to Object for Jdbc Output to consume. .map(new Function.MapFunction<OutputPojo, Object>() { @@ -193,11 +193,11 @@ public class MaxPerKeyExamples implements StreamingApplication return (Object)input; } }, name("OutputPojoToObject")) - + // Output the result to Jdbc Output. .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput")); - + stream.populateDag(dag); - + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java index db2a09e..59831b7 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java @@ -25,28 +25,28 @@ public class OutputPojo { private int month; private double meanTemp; - + @Override public String toString() { return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]"; } - + public void setMonth(int month) { this.month = month; } - + public int getMonth() { return this.month; } - + public void setMeanTemp(double meanTemp) { this.meanTemp = meanTemp; } - + public double getMeanTemp() { return meanTemp; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java index bf23e3a..dd09352 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java @@ -189,7 +189,7 @@ public class TriggerExample // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered // late, and dropped. - + WindowedStream<SampleBean> defaultTriggerResults = inputStream .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), new TriggerOption().discardingFiredPanes()) @@ -306,7 +306,7 @@ public class TriggerExample @Override public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream) { - + WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream .groupByKey(new ExtractFlowInfo()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java index 101953f..d32da72 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java @@ -37,11 +37,11 @@ public class MinimalWordCountTest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); - + MinimalWordCount app = new MinimalWordCount(); lma.prepareDAG(app, conf); - + LocalMode.Controller lc = lma.getController(); ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() { @@ -51,9 +51,9 @@ public class MinimalWordCountTest return MinimalWordCount.Collector.isDone(); } }); - + lc.run(10000); - + Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7); Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119); Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java index 952356f..f6270d4 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java @@ -56,16 +56,16 @@ public class WindowedWordCountTest return WindowedWordCount.TextInput.isDone(); } }); - + lc.run(60000); - + Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult())); Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2")); Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error")); Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9")); Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye")); } - + public long countSum(Map<KeyValPair<Long, String>, Long> map) { long sum = 0; @@ -74,7 +74,7 @@ public class WindowedWordCountTest } return sum; } - + public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word) { long sum = 0; @@ -85,6 +85,6 @@ public class WindowedWordCountTest } return sum; } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java index dc236f9..26bb13e 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java @@ -41,7 +41,7 @@ public class AutoCompleteTest Configuration conf = new Configuration(false); lma.prepareDAG(new AutoComplete(), conf); LocalMode.Controller lc = lma.getController(); - + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() { @Override @@ -50,9 +50,9 @@ public class AutoCompleteTest return AutoComplete.TweetsInput.isDone(); } }); - + lc.run(200000); - + Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("chi")); Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("china")); Assert.assertEquals(2, AutoComplete.Collector.getResult().get("china").get(0).getCount()); @@ -61,6 +61,6 @@ public class AutoCompleteTest Assert.assertEquals(3, AutoComplete.Collector.getResult().get("f").size()); Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(0).getCount() >= AutoComplete.Collector.getResult().get("f").get(1).getCount()); Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(1).getCount() >= AutoComplete.Collector.getResult().get("f").get(2).getCount()); - + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java index bf9b030..dc9cdec 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java @@ -54,10 +54,10 @@ public class StreamingWordExtractTest { try { Class.forName(DB_DRIVER).newInstance(); - + Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); Statement stmt = con.createStatement(); - + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " @@ -66,16 +66,16 @@ public class StreamingWordExtractTest + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + ")"; stmt.executeUpdate(createMetaTable); - + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + "(STRINGVALUE VARCHAR(255))"; stmt.executeUpdate(createTable); - + } catch (Throwable e) { throw Throwables.propagate(e); } } - + @After public void cleanTable() { @@ -88,7 +88,7 @@ public class StreamingWordExtractTest throw new RuntimeException(e); } } - + public void setConfig(Configuration conf) { conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME); @@ -99,14 +99,14 @@ public class StreamingWordExtractTest conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL); conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME); } - + public int getNumOfEventsInStore() { Connection con; try { con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); Statement stmt = con.createStatement(); - + String countQuery = "SELECT count(*) from " + TABLE_NAME; ResultSet resultSet = stmt.executeQuery(countQuery); resultSet.next(); @@ -115,7 +115,7 @@ public class StreamingWordExtractTest throw new RuntimeException("fetching count", e); } } - + @Test public void StreamingWordExtractTest() throws Exception { @@ -125,7 +125,7 @@ public class StreamingWordExtractTest StreamingWordExtract app = new StreamingWordExtract(); lma.prepareDAG(app, conf); LocalMode.Controller lc = lma.getController(); - + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() { @Override @@ -134,11 +134,11 @@ public class StreamingWordExtractTest return getNumOfEventsInStore() == 36; } }); - + lc.run(10000); - + Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore()); Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore()); } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java index f8ec086..c0dbaf4 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java @@ -41,7 +41,7 @@ public class TopWikipediaSessionsTest Configuration conf = new Configuration(false); lma.prepareDAG(new TopWikipediaSessions(), conf); LocalMode.Controller lc = lma.getController(); - + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() { @Override @@ -50,14 +50,14 @@ public class TopWikipediaSessionsTest return TopWikipediaSessions.SessionGen.getTupleCount() >= 250; } }); - + lc.run(30000); - + for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) { Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i))); } } - + public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input) { if (input.size() == 0 || input.size() == 1) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java index e363ca7..c532898 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java @@ -43,7 +43,7 @@ public class TrafficRoutesTest Configuration conf = new Configuration(false); lma.prepareDAG(new TrafficRoutes(), conf); LocalMode.Controller lc = lma.getController(); - + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() { @Override @@ -52,9 +52,9 @@ public class TrafficRoutesTest return TrafficRoutes.InfoGen.getTupleCount() >= 100; } }); - + lc.run(60000); - + Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty()); for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) { Assert.assertTrue(entry.getValue().getKey() <= 75); @@ -62,5 +62,5 @@ public class TrafficRoutesTest Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2")); } } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java index 5858013..b130808 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java @@ -35,11 +35,11 @@ public class CombinePerKeyExamplesTest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); - + CombinePerKeyExamples app = new CombinePerKeyExamples(); - + lma.prepareDAG(app, conf); - + LocalMode.Controller lc = lma.getController(); ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() { @@ -50,7 +50,7 @@ public class CombinePerKeyExamplesTest } }); lc.run(100000); - + Assert.assertTrue(CombinePerKeyExamples.Collector.result.get(CombinePerKeyExamples.Collector.result.size() - 1).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8")); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java index ed4ddb4..a175cd7 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java @@ -38,7 +38,7 @@ public class DeDupExampleTest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); - + DeDupExample app = new DeDupExample(); lma.prepareDAG(app, conf); LocalMode.Controller lc = lma.getController(); @@ -51,9 +51,9 @@ public class DeDupExampleTest } }); lc.run(50000); - + Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size()); - + } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java index 51981de..ec28b40 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java @@ -46,7 +46,7 @@ import com.datatorrent.stram.StramLocalCluster; */ public class MaxPerKeyExamplesTest { - + private static final String INPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.InputPojo"; private static final String OUTPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.OutputPojo"; private static final String DB_DRIVER = "org.h2.Driver"; @@ -56,18 +56,18 @@ public class MaxPerKeyExamplesTest private static final String USER_NAME = "root"; private static final String PSW = "password"; private static final String QUERY = "SELECT * FROM " + INPUT_TABLE + ";"; - + private static final double[] MEANTEMPS = {85.3, 75.4}; - + @BeforeClass public static void setup() { try { Class.forName(DB_DRIVER).newInstance(); - + Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); Statement stmt = con.createStatement(); - + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " @@ -76,53 +76,53 @@ public class MaxPerKeyExamplesTest + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + ")"; stmt.executeUpdate(createMetaTable); - + String createInputTable = "CREATE TABLE IF NOT EXISTS " + INPUT_TABLE + "(MONTH INT(2) not NULL, DAY INT(2), YEAR INT(4), MEANTEMP DOUBLE(10) )"; stmt.executeUpdate(createInputTable); - + String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE + "(MONTH INT(2) not NULL, MEANTEMP DOUBLE(10) )"; stmt.executeUpdate(createOutputTable); - + String cleanTable = "truncate table " + INPUT_TABLE; stmt.executeUpdate(cleanTable); - + stmt = con.createStatement(); - + String sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 21, 2014, 85.3)"; stmt.executeUpdate(sql); sql = "INSERT INTO " + INPUT_TABLE + " VALUES (7, 20, 2014, 75.4)"; stmt.executeUpdate(sql); sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 18, 2014, 45.3)"; stmt.executeUpdate(sql); - + } catch (Throwable e) { throw Throwables.propagate(e); } } - + @AfterClass public static void cleanup() { try { Class.forName(DB_DRIVER).newInstance(); - + Connection con = DriverManager.getConnection(DB_URL, USER_NAME, PSW); Statement stmt = con.createStatement(); - + String dropInputTable = "DROP TABLE " + INPUT_TABLE; stmt.executeUpdate(dropInputTable); - + String dropOutputTable = "DROP TABLE " + OUTPUT_TABLE; stmt.executeUpdate(dropOutputTable); - + } catch (Throwable e) { throw Throwables.propagate(e); } - + } - + public void setConfig(Configuration conf) { conf.set("dt.operator.jdbcInput.prop.store.userName", USER_NAME); @@ -133,7 +133,7 @@ public class MaxPerKeyExamplesTest conf.set("dt.operator.jdbcInput.prop.store.databaseUrl", DB_URL); conf.set("dt.operator.jdbcInput.prop.tableName", INPUT_TABLE); conf.set("dt.operator.jdbcInput.prop.query", QUERY); - + conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME); conf.set("dt.operator.jdbcOutput.prop.store.password", PSW); conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER); @@ -142,14 +142,14 @@ public class MaxPerKeyExamplesTest conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL); conf.set("dt.operator.jdbcOutput.prop.tablename", OUTPUT_TABLE); } - + public int getNumEntries() { Connection con; try { con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); Statement stmt = con.createStatement(); - + String countQuery = "SELECT count(DISTINCT (MONTH, MEANTEMP)) from " + OUTPUT_TABLE; ResultSet resultSet = stmt.executeQuery(countQuery); resultSet.next(); @@ -158,7 +158,7 @@ public class MaxPerKeyExamplesTest throw new RuntimeException("fetching count", e); } } - + public Map<Integer, Double> getMaxMeanTemp() { Map<Integer, Double> result = new HashMap<>(); @@ -166,30 +166,30 @@ public class MaxPerKeyExamplesTest try { con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); Statement stmt = con.createStatement(); - + String countQuery = "SELECT DISTINCT * from " + OUTPUT_TABLE; ResultSet resultSet = stmt.executeQuery(countQuery); while (resultSet.next()) { result.put(resultSet.getInt("MONTH"), resultSet.getDouble("MEANTEMP")); - + } return result; } catch (SQLException e) { throw new RuntimeException("fetching count", e); } } - + @Test public void MaxPerKeyExampleTest() throws Exception { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); setConfig(conf); - + MaxPerKeyExamples app = new MaxPerKeyExamples(); - + lma.prepareDAG(app, conf); - + LocalMode.Controller lc = lma.getController(); ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() { @@ -199,9 +199,9 @@ public class MaxPerKeyExamplesTest return getNumEntries() == 2; } }); - + lc.run(5000); - + double[] result = new double[2]; result[0] = getMaxMeanTemp().get(6); result[1] = getMaxMeanTemp().get(7); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java ---------------------------------------------------------------------- diff --git a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java index 5f93206..7fbdfd1 100644 --- a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java +++ b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java @@ -18,23 +18,24 @@ */ package com.datatorrent.demos.iteration; -import com.datatorrent.api.Context; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.DefaultDelayOperator; import com.datatorrent.lib.testbench.RandomEventGenerator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; /** * Iteration demo : <br> @@ -64,10 +65,10 @@ import java.io.PrintStream; * * @since 3.4.0 */ -@ApplicationAnnotation(name="IterationDemo") +@ApplicationAnnotation(name = "IterationDemo") public class Application implements StreamingApplication { - private final static Logger LOG = LoggerFactory.getLogger(Application.class); + private static final Logger LOG = LoggerFactory.getLogger(Application.class); private String extraOutputFileName; // for unit test public static class FibonacciOperator extends BaseOperator @@ -117,7 +118,7 @@ public class Application implements StreamingApplication public void process(Object t) { String s = t.toString(); - System.out.println(s); + LOG.info(s); if (extraOutputStream != null) { extraOutputStream.println(s); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java index 7804fcd..9fb89ac 100644 --- a/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java +++ b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java @@ -18,17 +18,16 @@ */ package com.datatorrent.demos.iteration; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; -import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; -import com.datatorrent.api.LocalMode; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; /** * @@ -61,7 +60,8 @@ public class ApplicationTest if (file.length() > 50) { break; } - } while (System.currentTimeMillis() - startTime < timeout); + } + while (System.currentTimeMillis() - startTime < timeout); lc.shutdown(); try (BufferedReader br = new BufferedReader(new FileReader(outputFileName))) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java index 32aac35..55b299f 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java @@ -18,22 +18,23 @@ */ package com.datatorrent.demos.machinedata; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; import com.datatorrent.demos.machinedata.data.MachineKey; import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingOperator; import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingPrerequisitesOperator; -import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; import com.datatorrent.lib.io.SmtpOutputOperator; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * <p> * Resource monitor application. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java index 77b39b5..75c2a02 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java @@ -18,18 +18,13 @@ */ package com.datatorrent.demos.machinedata; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.demos.machinedata.data.MachineInfo; import com.datatorrent.demos.machinedata.data.MachineKey; -import java.util.*; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * <p> * Information tuple generator with randomness. @@ -42,9 +37,10 @@ public class DimensionGenerator extends BaseOperator { public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>(); public transient DefaultOutputPort<MachineInfo> output = new DefaultOutputPort<>(); - private int threshold=90; + private int threshold = 90; - public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>() { + public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>() + { @Override public void process(MachineInfo tuple) @@ -113,9 +109,9 @@ public class DimensionGenerator extends BaseOperator int hdd = tuple.getHdd(); MachineInfo machineInfo = new MachineInfo(); machineInfo.setMachineKey(machineKey); - machineInfo.setCpu((cpu < threshold)?cpu:threshold); - machineInfo.setRam((ram < threshold)?ram:threshold); - machineInfo.setHdd((hdd < threshold)?hdd:threshold); + machineInfo.setCpu((cpu < threshold) ? cpu : threshold); + machineInfo.setRam((ram < threshold) ? ram : threshold); + machineInfo.setHdd((hdd < threshold) ? hdd : threshold); outputInline.emit(machineInfo); output.emit(machineInfo); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java index 560df52..85ec954 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java @@ -18,20 +18,23 @@ */ package com.datatorrent.demos.machinedata; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.demos.machinedata.data.MachineInfo; -import com.datatorrent.demos.machinedata.data.MachineKey; - import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.Calendar; +import java.util.Date; +import java.util.Random; +import java.util.TimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.demos.machinedata.data.MachineInfo; +import com.datatorrent.demos.machinedata.data.MachineKey; + /** * <p> * Information tuple generator with randomness. @@ -74,6 +77,7 @@ public class InputReceiver extends BaseOperator implements InputOperator dayDateFormat.setTimeZone(tz); } + @Override public void setup(Context.OperatorContext context) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java index 722a77e..2b3bb1c 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java @@ -305,25 +305,29 @@ public class MachineKey if (!(obj instanceof MachineKey)) { return false; } - MachineKey mkey = (MachineKey) obj; + MachineKey mkey = (MachineKey)obj; return checkStringEqual(this.timeKey, mkey.timeKey) && checkStringEqual(this.day, mkey.day) && checkIntEqual(this.customer, mkey.customer) && checkIntEqual(this.product, mkey.product) && checkIntEqual(this.os, mkey.os) && checkIntEqual(this.software1, mkey.software1) && checkIntEqual(this.software2, mkey.software2) && checkIntEqual(this.software3, mkey.software3) && checkIntEqual(this.deviceId, mkey.deviceId); } private boolean checkIntEqual(Integer a, Integer b) { - if ((a == null) && (b == null)) + if ((a == null) && (b == null)) { return true; - if ((a != null) && a.equals(b)) + } + if ((a != null) && a.equals(b)) { return true; + } return false; } private boolean checkStringEqual(String a, String b) { - if ((a == null) && (b == null)) + if ((a == null) && (b == null)) { return true; - if ((a != null) && a.equals(b)) + } + if ((a != null) && a.equals(b)) { return true; + } return false; } @@ -331,20 +335,27 @@ public class MachineKey public String toString() { StringBuilder sb = new StringBuilder(timeKey); - if (customer != null) + if (customer != null) { sb.append("|0:").append(customer); - if (product != null) + } + if (product != null) { sb.append("|1:").append(product); - if (os != null) + } + if (os != null) { sb.append("|2:").append(os); - if (software1 != null) + } + if (software1 != null) { sb.append("|3:").append(software1); - if (software2 != null) + } + if (software2 != null) { sb.append("|4:").append(software2); - if (software3 != null) + } + if (software3 != null) { sb.append("|5:").append(software3); - if (deviceId != null) + } + if (deviceId != null) { sb.append("|6:").append(deviceId); + } return sb.toString(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java index a0b2ecf..d474c5c 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java @@ -18,45 +18,49 @@ */ package com.datatorrent.demos.machinedata.data; -import com.google.common.collect.Maps; - import java.util.Map; +import com.google.common.collect.Maps; + /** * This class captures the resources whose usage is collected for each device * <p>ResourceType class.</p> * * @since 0.3.5 */ -public enum ResourceType { +public enum ResourceType +{ - CPU("cpu"), RAM("ram"), HDD("hdd"); + CPU("cpu"), RAM("ram"), HDD("hdd"); - private static Map<String, ResourceType> descToResource = Maps.newHashMap(); + private static Map<String, ResourceType> descToResource = Maps.newHashMap(); - static { - for (ResourceType type : ResourceType.values()) { - descToResource.put(type.desc, type); - } + static { + for (ResourceType type : ResourceType.values()) { + descToResource.put(type.desc, type); } + } - private String desc; + private String desc; - private ResourceType(String desc) { - this.desc = desc; - } + private ResourceType(String desc) + { + this.desc = desc; + } - @Override - public String toString() { - return desc; - } + @Override + public String toString() + { + return desc; + } - /** - * This method returns ResourceType for the given description - * @param desc the description - * @return - */ - public static ResourceType getResourceTypeOf(String desc) { - return descToResource.get(desc); - } + /** + * This method returns ResourceType for the given description + * @param desc the description + * @return + */ + public static ResourceType getResourceTypeOf(String desc) + { + return descToResource.get(desc); + } }
