Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 d710af9b1 -> ae5f1ede5
MLHR-1855 TopN word counts with visualization support Fixed per review comments Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0a5914ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0a5914ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0a5914ce Branch: refs/heads/devel-3 Commit: 0a5914ce8066159fc71633e8dc4db12abd23d143 Parents: 9194a72 Author: Munagala V. Ramanath <[email protected]> Authored: Mon Sep 21 06:17:41 2015 -0700 Committer: Munagala V. Ramanath <[email protected]> Committed: Thu Sep 24 13:45:36 2015 -0700 ---------------------------------------------------------------------- demos/wordcount/pom.xml | 8 + .../wordcount/ApplicationWithQuerySupport.java | 122 ++++++++ .../demos/wordcount/FileWordCount.java | 288 +++++++++++++++++++ .../datatorrent/demos/wordcount/LineReader.java | 95 ++++++ .../com/datatorrent/demos/wordcount/WCPair.java | 34 +++ .../demos/wordcount/WindowWordCount.java | 82 ++++++ .../demos/wordcount/WordCountWriter.java | 90 ++++++ .../datatorrent/demos/wordcount/WordReader.java | 69 +++++ .../src/main/resources/META-INF/properties.xml | 88 +++++- .../src/main/resources/WordDataSchema.json | 4 + 10 files changed, 869 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/pom.xml ---------------------------------------------------------------------- diff --git a/demos/wordcount/pom.xml b/demos/wordcount/pom.xml index a3834ba..6535c66 100644 --- a/demos/wordcount/pom.xml +++ b/demos/wordcount/pom.xml @@ -21,4 +21,12 @@ <skipTests>true</skipTests> </properties> + <dependencies> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + <version>6.6.4</version> + </dependency> + </dependencies> + </project> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java new file mode 100644 index 0000000..9b0b8d8 --- /dev/null +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.demos.wordcount; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.Operator; + +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; +import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; +import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.stream.DevNull; + +import org.apache.hadoop.conf.Configuration; + +@ApplicationAnnotation(name="TopNWordsWithQueries") +public class ApplicationWithQuerySupport implements StreamingApplication +{ + private static final Logger LOG = LoggerFactory.getLogger(ApplicationWithQuerySupport.class); + + public static final String + SNAPSHOT_SCHEMA = "WordDataSchema.json", + APP_NAME = "TopNWordsWithQueries"; + + private final Locality locality = null; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // create operators + LineReader lineReader = dag.addOperator("lineReader", new LineReader()); + WordReader wordReader = dag.addOperator("wordReader", new WordReader()); + WindowWordCount windowWordCount = dag.addOperator("windowWordCount", new WindowWordCount()); + FileWordCount fileWordCount = dag.addOperator("fileWordCount", new FileWordCount()); + WordCountWriter wcWriter = dag.addOperator("wcWriter", new WordCountWriter()); + ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); + console.setStringFormat("wordCount: %s"); + + // create streams + + dag.addStream("lines", lineReader.output, wordReader.input); + dag.addStream("control", lineReader.control, fileWordCount.control); + dag.addStream("words", wordReader.output, windowWordCount.input); + dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input); + dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input); + + String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); + + if ( ! StringUtils.isEmpty(gatewayAddress)) { // add query support + URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + + AppDataSnapshotServerMap snapshotServerFile + = dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap()); + AppDataSnapshotServerMap snapshotServerGlobal + = dag.addOperator("snapshotServerGlobal", new AppDataSnapshotServerMap()); + + String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA); + snapshotServerFile.setSnapshotSchemaJSON(snapshotServerJSON); + snapshotServerGlobal.setSnapshotSchemaJSON(snapshotServerJSON); + + PubSubWebSocketAppDataQuery + wsQueryFile = new PubSubWebSocketAppDataQuery(), + wsQueryGlobal = new PubSubWebSocketAppDataQuery(); + wsQueryFile.setUri(uri); + wsQueryGlobal.setUri(uri); + + snapshotServerFile.setEmbeddableQueryInfoProvider(wsQueryFile); + snapshotServerGlobal.setEmbeddableQueryInfoProvider(wsQueryGlobal); + + PubSubWebSocketAppDataResult wsResultFile + = dag.addOperator("wsResultFile", new PubSubWebSocketAppDataResult()); + PubSubWebSocketAppDataResult wsResultGlobal + = dag.addOperator("wsResultGlobal", new PubSubWebSocketAppDataResult()); + wsResultFile.setUri(uri); + wsResultGlobal.setUri(uri); + + Operator.InputPort<String> queryResultFilePort = wsResultFile.input; + Operator.InputPort<String> queryResultGlobalPort = wsResultGlobal.input; + + dag.addStream("WordCountsFile", fileWordCount.outputPerFile, + snapshotServerFile.input, console.input); + dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal, + snapshotServerGlobal.input); + + dag.addStream("ResultFile", snapshotServerFile.queryResult, queryResultFilePort); + dag.addStream("ResultGlobal", snapshotServerGlobal.queryResult, queryResultGlobalPort); + } else { + //throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS"); + dag.addStream("WordCounts", fileWordCount.outputPerFile, console.input); + } + + System.out.println("done with populateDAG, isDebugEnabled = " + LOG.isDebugEnabled()); + LOG.info("Returning from populateDAG"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java new file mode 100644 index 0000000..615fa5c --- /dev/null +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java @@ -0,0 +1,288 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.demos.wordcount; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +//import org.apache.commons.lang.mutable.MutableInt; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.common.util.BaseOperator; + +/** + * Monitors an input directory for text files, computes word frequency counts per file and globally, + * and writes the top N pairs to an output file and to snapshot servers for visualization. + * Currently designed to work with only 1 file at a time; will be enhanced later to support + * multiple files dropped into the monitored directory at the same time. + * + * <p> + * Receives per-window list of pairs (word, frequency) on the input port. When the end of a file + * is reached, expects to get an EOF on the control port; at the next endWindow, the top N words + * and frequencies are computed and emitted to the output ports. + * <p> + * There are 3 output ports: (a) One for the per-file top N counts emitted when the file is fully + * read and is written to the output file. (b) One for the top N counts emitted per window for the + * current file to the snapshot server and (c) One for the global top N counts emitted per window + * to a different snapshot server. + * + * Since the EOF is received by a single operator, this operator cannot be partitionable + */ +public class FileWordCount extends BaseOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class); + private static final String GLOBAL = "global"; + + // If topN > 0, only data for the topN most frequent words is output; if topN == 0, the + // entire frequency map is output + // + protected int topN; + + // set to true when we get an EOF control tuple + protected boolean eof = false; + + // last component of path (i.e. only file name) + // incoming value from control tuple + protected String fileName; + + // wordMapFile : {word => frequency} map, current file, all words + // wordMapGlobal : {word => frequency} map, global, all words + // + protected Map<String, WCPair> wordMapFile = new HashMap<>(); + protected Map<String, WCPair> wordMapGlobal = new HashMap<>(); + + // resultPerFile : singleton list [TopNMap] with per file data; sent on outputPerFile + // resultGlobal : singleton list [wordFreqMap] with per file data; sent on outputGlobal + // + protected transient List<Map<String, Object>> resultPerFile, resultGlobal; + + // singleton map of fileName to sorted list of (word, frequency) pairs + protected transient Map<String, Object> resultFileFinal; + protected transient List<WCPair> fileFinalList; + + public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>() + { + @Override + public void process(List<WCPair> list) + { + // blend incoming list into wordMapFile and wordMapGlobal + for (WCPair pair : list) { + final String word = pair.word; + WCPair filePair = wordMapFile.get(word); + if (null != filePair) { // word seen previously in current file + WCPair globalPair = wordMapGlobal.get(word); // cannot be null + filePair.freq += pair.freq; + globalPair.freq += pair.freq; + continue; + } + + // new word in current file + filePair = new WCPair(word, pair.freq); + wordMapFile.put(word, filePair); + + // check global map + WCPair globalPair = wordMapGlobal.get(word); // may be null + if (null != globalPair) { // word seen previously + globalPair.freq += pair.freq; + continue; + } + + // word never seen before + globalPair = new WCPair(word, pair.freq); + wordMapGlobal.put(word, globalPair); + } + } + }; + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<String> control = new DefaultInputPort<String>() + { + @Override + public void process(String msg) + { + if (msg.isEmpty()) { // sanity check + throw new RuntimeException("Empty file path"); + } + LOG.info("FileWordCount: EOF for {}, topN = {}", msg, topN); + fileName = msg; + eof = true; + // NOTE: current version only supports processing one file at a time. + } + }; + + // outputPerFile -- tuple is TopNMap for current file + // outputGlobal -- tuple is TopNMap globally + // + public final transient DefaultOutputPort<List<Map<String, Object>>> + outputPerFile = new DefaultOutputPort<>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<List<Map<String, Object>>> + outputGlobal = new DefaultOutputPort<>(); + + // fileOutput -- tuple is singleton map {<fileName> => TopNMap} where TopNMap is the final + // top N for current file; emitted on EOF + // + public final transient DefaultOutputPort<Map<String, Object>> + fileOutput = new DefaultOutputPort<>(); + + public int getTopN() { + return topN; + } + + public void setTopN(int n) { + topN = n; + } + + @Override + public void setup(OperatorContext context) + { + if (null == wordMapFile) { + wordMapFile = new HashMap<>(); + } + if (null == wordMapGlobal) { + wordMapGlobal = new HashMap<>(); + } + resultPerFile = new ArrayList(1); + resultGlobal = new ArrayList(1); + // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName + resultFileFinal = new HashMap<>(1); + fileFinalList = new ArrayList<>(); + } + + @Override + public void endWindow() + { + LOG.info("FileWordCount: endWindow for {}, topN = {}", fileName, topN); + + if (wordMapFile.isEmpty()) { // no words found + if (eof) { // write empty list to fileOutput port + // got EOF, so output empty list to output file + fileFinalList.clear(); + resultFileFinal.put(fileName, fileFinalList); + fileOutput.emit(resultFileFinal); + + // reset for next file + eof = false; + fileName = null; + resultFileFinal.clear(); + } + LOG.info("FileWordCount: endWindow for {}, no words, topN = {}", fileName, topN); + return; + } + + LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}", + fileName, wordMapFile.size(), topN); + + // have some words; need the file name to emit topN + if (null == fileName) { // should never happen + throw new RuntimeException("No fileName at endWindow"); + } + + // get topN list for this file and, if we have EOF, emit to fileOutput port + + // get topN global list and emit to global output port + getTopNMap(wordMapGlobal, resultGlobal); + LOG.info("FileWordCount: resultGlobal.size = {}", resultGlobal.size()); + outputGlobal.emit(resultGlobal); + + // get topN list for this file and emit to file output port + getTopNMap(wordMapFile, resultPerFile); + LOG.info("FileWordCount: resultPerFile.size = {}", resultPerFile.size()); + outputPerFile.emit(resultPerFile); + + if (eof) { + // got EOF, so compute final topN list from wordMapFile into fileFinalList and emit it + getTopNList(wordMapFile); + resultFileFinal.put(fileName, fileFinalList); + fileOutput.emit(resultFileFinal); + + // reset for next file + eof = false; + fileName = null; + wordMapFile.clear(); + resultFileFinal.clear(); + } + } + + // get topN frequencies from map, convert each pair to a singleton map and append to result + // This map is suitable input to AppDataSnapshotServer + // MUST have map.size() > 0 here + // + private void getTopNMap(final Map<String, WCPair> map, List<Map<String, Object>> result) + { + final ArrayList<WCPair> list = new ArrayList<>(map.values()); + + // sort entries in descending order of frequency + Collections.sort(list, new Comparator<WCPair>() { + @Override + public int compare(WCPair o1, WCPair o2) { + return (int)(o2.freq - o1.freq); + } + }); + + if (topN > 0) { + list.subList(topN, map.size()).clear(); // retain only the first topN entries + } + + // convert each pair (word, freq) of list to a map with 2 elements + // {("word": <word>, "count": freq)} and append to list + // + result.clear(); + for (WCPair pair : list) { + Map<String, Object> wmap = new HashMap<>(2); + wmap.put("word", pair.word); + wmap.put("count", pair.freq); + result.add(wmap); + } + LOG.info("FileWordCount:getTopNMap: result.size = {}", result.size()); + list.clear(); + } + + // populate fileFinalList with topN frequencies from argument + // This list is suitable input to WordCountWriter which writes it to a file + // MUST have map.size() > 0 here + // + private void getTopNList(final Map<String, WCPair> map) + { + fileFinalList.clear(); + fileFinalList.addAll(map.values()); + + // sort entries in descending order of frequency + Collections.sort(fileFinalList, new Comparator<WCPair>() { + @Override + public int compare(WCPair o1, WCPair o2) { + return (int)(o2.freq - o1.freq); + } + }); + + if (topN > 0) { + fileFinalList.subList(topN, map.size()).clear(); // retain only the first topN entries + } + LOG.info("FileWordCount:getTopNList: fileFinalList.size = {}", fileFinalList.size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java new file mode 100644 index 0000000..b030356 --- /dev/null +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.demos.wordcount; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.apache.hadoop.fs.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; + +// reads lines from input file and returns them; if end-of-file is reached, a control tuple +// is emitted on the control port +// +public class LineReader extends AbstractFileInputOperator<String> +{ + private static final Logger LOG = LoggerFactory.getLogger(LineReader.class); + + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>(); + + private transient BufferedReader br = null; + + private Path path; + + @Override + protected InputStream openFile(Path curPath) throws IOException + { + LOG.info("openFile: curPath = {}", curPath); + path = curPath; + InputStream is = super.openFile(path); + br = new BufferedReader(new InputStreamReader(is)); + return is; + } + + @Override + protected void closeFile(InputStream is) throws IOException + { + super.closeFile(is); + br.close(); + br = null; + path = null; + } + + // return empty string + @Override + protected String readEntity() throws IOException + { + // try to read a line + final String line = br.readLine(); + if (null != line) { // common case + LOG.debug("readEntity: line = {}", line); + return line; + } + + // end-of-file; send control tuple, containing only the last component of the path + // (only file name) on control port + // + if (control.isConnected()) { + LOG.info("readEntity: EOF for {}", path); + final String name = path.getName(); // final component of path + control.emit(name); + } + + return null; + } + + @Override + protected void emit(String tuple) + { + output.emit(tuple); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java new file mode 100644 index 0000000..0eb1e72 --- /dev/null +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.demos.wordcount; + +// a single (word, frequency) pair +public class WCPair { + public String word; + public int freq; + + public WCPair() {} + + public WCPair(String w, int f) { + word = w; + freq = f; + } + + @Override + public String toString() { + return String.format("(%s, %d)", word, freq); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java new file mode 100644 index 0000000..93f4f2f --- /dev/null +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.demos.wordcount; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.common.util.BaseOperator; + +// Computes word frequency counts per window and emits them at each endWindow. The output is a +// list of pairs (word, frequency). +// +public class WindowWordCount extends BaseOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class); + + // wordMap : word => frequency + protected Map<String, WCPair> wordMap = new HashMap<>(); + + public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() + { + @Override + public void process(String word) + { + WCPair pair = wordMap.get(word); + if (null != pair) { // word seen previously + pair.freq += 1; + return; + } + + // new word + pair = new WCPair(); + pair.word = word; + pair.freq = 1; + wordMap.put(word, pair); + } + }; + + // output port which emits the list of word frequencies for current window + // fileName => list of (word, freq) pairs + // + public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>(); + + @Override + public void endWindow() + { + LOG.info("WindowWordCount: endWindow"); + + // got EOF; if no words found, do nothing + if (wordMap.isEmpty()) return; + + // have some words; emit single map and reset for next file + final ArrayList<WCPair> list = new ArrayList<>(wordMap.values()); + output.emit(list); + list.clear(); + wordMap.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java new file mode 100644 index 0000000..c1c58ef --- /dev/null +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.demos.wordcount; + +import java.io.UnsupportedEncodingException; + +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +// write top N words and their frequencies to a file +// +public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>> +{ + private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class); + private static final String charsetName = "UTF-8"; + private static final String nl = System.lineSeparator(); + + private String fileName; // current file name + private transient final StringBuilder sb = new StringBuilder(); + + @Override + public void endWindow() + { + if (null != fileName) { + requestFinalize(fileName); + } + super.endWindow(); + } + + // input is a singleton list [M] where M is a singleton map {fileName => L} where L is a + // list of pairs: (word, frequency) + // + @Override + protected String getFileName(Map<String, Object> tuple) + { + LOG.info("getFileName: tuple.size = {}", tuple.size()); + + final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next(); + fileName = entry.getKey(); + LOG.info("getFileName: fileName = {}", fileName); + return fileName; + } + + @Override + protected byte[] getBytesForTuple(Map<String, Object> tuple) + { + LOG.info("getBytesForTuple: tuple.size = {}", tuple.size()); + + // get first and only pair; key is the fileName and is ignored here + final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next(); + final List<WCPair> list = (List<WCPair>) entry.getValue(); + + if (sb.length() > 0) { // clear buffer + sb.delete(0, sb.length()); + } + + for ( WCPair pair : list ) { + sb.append(pair.word); sb.append(" : "); + sb.append(pair.freq); sb.append(nl); + } + + final String data = sb.toString(); + LOG.info("getBytesForTuple: data = {}", data); + try { + final byte[] result = data.getBytes(charsetName); + return result; + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("Should never get here", ex); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java new file mode 100644 index 0000000..b509540 --- /dev/null +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.demos.wordcount; + +import java.util.regex.Pattern; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +// extracts words from input line +public class WordReader extends BaseOperator +{ + // default pattern for word-separators + private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+"); + + private String nonWordStr; // configurable regex + private transient Pattern nonWord; // compiled regex + + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + public final transient DefaultInputPort<String> + input = new DefaultInputPort<String>() { + + @Override + public void process(String line) + { + // line; split it into words and emit them + final String[] words = nonWord.split(line); + for (String word : words) { + if (word.isEmpty()) continue; + output.emit(word); + } + } + }; + + public String getNonWordStr() { + return nonWordStr; + } + + public void setNonWordStr(String regex) { + nonWordStr = regex; + } + + @Override + public void setup(OperatorContext context) + { + if (null == nonWordStr) { + nonWord = nonWordDefault; + } else { + nonWord = Pattern.compile(nonWordStr); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/resources/META-INF/properties.xml b/demos/wordcount/src/main/resources/META-INF/properties.xml index aa8d0bf..aaee620 100644 --- a/demos/wordcount/src/main/resources/META-INF/properties.xml +++ b/demos/wordcount/src/main/resources/META-INF/properties.xml @@ -1,12 +1,78 @@ <configuration> - <property> - <name>dt.application.WordCountDemo.operator.wordinput.fileName</name> - <value>samplefile.txt</value> - </property> - <property> - <name>dt.application.WordCountDemo.stream.wordinput.count.locality</name> - <value>CONTAINER_LOCAL</value> - <description>Specify container locality for the viewtuplecount stream - </description> - </property> -</configuration> \ No newline at end of file + <!-- TopNWordsWithQueries --> + + <!-- for debugging --> + <!-- + <property> + <name>dt.attr.CONTAINER_JVM_OPTIONS</name> + <value>-Dlog4j.configuration=my_log4j.properties</value> + </property> + --> + + <!-- monitored input directory --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.lineReader.directory</name> + <value>/tmp/test/input-dir</value> + </property> + + <!-- regular expression for word separator --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.wordReader.nonWordStr</name> + <value>[\p{Punct}\s]+</value> + </property> + + <!-- output directory for word counts --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.wcWriter.filePath</name> + <value>/tmp/test/output-dir</value> + </property> + + <!-- Top N value --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.fileWordCount.topN</name> + <value>10</value> + </property> + + <!-- topic for queries (current file) --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.snapshotServerFile.embeddableQueryInfoProvider.topic</name> + <value>TopNWordsQueryFile</value> + </property> + + <!-- topic for query results (current file) --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.wsResultFile.topic</name> + <value>TopNWordsQueryFileResult</value> + </property> + + <!-- topic for queries (global) --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.snapshotServerGlobal.embeddableQueryInfoProvider.topic</name> + <value>TopNWordsQueryGlobal</value> + </property> + + <!-- topic for query results (global) --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.wsResultGlobal.topic</name> + <value>TopNWordsQueryGlobalResult</value> + </property> + + <!-- retry count --> + <property> + <name>dt.application.TwitterDemo.operator.wsResult.numRetries</name> + <value>2147483647</value> + </property> + + + <!-- WordCountDemo --> + <property> + <name>dt.application.WordCountDemo.operator.wordinput.fileName</name> + <value>samplefile.txt</value> + </property> + <property> + <name>dt.application.WordCountDemo.stream.wordinput.count.locality</name> + <value>CONTAINER_LOCAL</value> + <description>Specify container locality for the viewtuplecount stream + </description> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/resources/WordDataSchema.json ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/resources/WordDataSchema.json b/demos/wordcount/src/main/resources/WordDataSchema.json new file mode 100644 index 0000000..5e8e7c0 --- /dev/null +++ b/demos/wordcount/src/main/resources/WordDataSchema.json @@ -0,0 +1,4 @@ +{ + "values": [{"name": "word", "type": "string"}, + {"name": "count", "type": "integer"}] +}
