Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 9f1a1bb9d -> 66a1d58b8
MLHR-1860 Fix bug; also some cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2649ec94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2649ec94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2649ec94 Branch: refs/heads/devel-3 Commit: 2649ec9409c2cfe5ee11e04f5a6a0f6898eb9591 Parents: ae5f1ed Author: Munagala V. Ramanath <[email protected]> Authored: Sun Sep 27 13:31:34 2015 -0700 Committer: Munagala V. Ramanath <[email protected]> Committed: Sun Sep 27 13:31:34 2015 -0700 ---------------------------------------------------------------------- .../demos/wordcount/ApplicationWithQuerySupport.java | 15 +++------------ .../datatorrent/demos/wordcount/FileWordCount.java | 13 ++++++------- .../datatorrent/demos/wordcount/WindowWordCount.java | 3 --- 3 files changed, 9 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2649ec94/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java index 9b0b8d8..a168615 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java @@ -16,8 +16,6 @@ package com.datatorrent.demos.wordcount; import java.net.URI; -import java.util.List; -import java.util.Map; import org.apache.commons.lang.StringUtils; @@ -27,7 +25,6 @@ import org.slf4j.LoggerFactory; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.Operator; import com.datatorrent.lib.appdata.schemas.SchemaUtils; @@ -35,7 +32,6 @@ import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.stream.DevNull; import org.apache.hadoop.conf.Configuration; @@ -44,11 +40,7 @@ public class ApplicationWithQuerySupport implements StreamingApplication { private static final Logger LOG = LoggerFactory.getLogger(ApplicationWithQuerySupport.class); - public static final String - SNAPSHOT_SCHEMA = "WordDataSchema.json", - APP_NAME = "TopNWordsWithQueries"; - - private final Locality locality = null; + public static final String SNAPSHOT_SCHEMA = "WordDataSchema.json"; @Override public void populateDAG(DAG dag, Configuration conf) @@ -84,9 +76,8 @@ public class ApplicationWithQuerySupport implements StreamingApplication snapshotServerFile.setSnapshotSchemaJSON(snapshotServerJSON); snapshotServerGlobal.setSnapshotSchemaJSON(snapshotServerJSON); - PubSubWebSocketAppDataQuery - wsQueryFile = new PubSubWebSocketAppDataQuery(), - wsQueryGlobal = new PubSubWebSocketAppDataQuery(); + PubSubWebSocketAppDataQuery wsQueryFile = new PubSubWebSocketAppDataQuery(); + PubSubWebSocketAppDataQuery wsQueryGlobal = new PubSubWebSocketAppDataQuery(); wsQueryFile.setUri(uri); wsQueryGlobal.setUri(uri); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2649ec94/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java index 615fa5c..7b0ec0e 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java @@ -198,11 +198,6 @@ public class FileWordCount extends BaseOperator LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}", fileName, wordMapFile.size(), topN); - // have some words; need the file name to emit topN - if (null == fileName) { // should never happen - throw new RuntimeException("No fileName at endWindow"); - } - // get topN list for this file and, if we have EOF, emit to fileOutput port // get topN global list and emit to global output port @@ -215,8 +210,12 @@ public class FileWordCount extends BaseOperator LOG.info("FileWordCount: resultPerFile.size = {}", resultPerFile.size()); outputPerFile.emit(resultPerFile); - if (eof) { - // got EOF, so compute final topN list from wordMapFile into fileFinalList and emit it + if (eof) { // got EOF earlier + if (null == fileName) { // need file name to emit topN pairs to file writer + throw new RuntimeException("EOF but no fileName at endWindow"); + } + + // so compute final topN list from wordMapFile into fileFinalList and emit it getTopNList(wordMapFile); resultFileFinal.put(fileName, fileFinalList); fileOutput.emit(resultFileFinal); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2649ec94/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java index 93f4f2f..4d5895d 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java @@ -16,8 +16,6 @@ package com.datatorrent.demos.wordcount; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,7 +25,6 @@ import org.slf4j.LoggerFactory; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.common.util.BaseOperator; // Computes word frequency counts per window and emits them at each endWindow. The output is a
