Repository: samza-hello-samza Updated Branches: refs/heads/latest c87ed565f -> 3d0e919e6
SAMZA-1236: Initial draft of the fluent API example for tutorials Author: Jacob Maes <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #11 from jmakes/samza-1236 Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/3d0e919e Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/3d0e919e Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/3d0e919e Branch: refs/heads/latest Commit: 3d0e919e6cf96ffab7c2028e5ebef2bd99624346 Parents: c87ed56 Author: Jacob Maes <[email protected]> Authored: Fri May 5 16:48:21 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Fri May 5 16:48:21 2017 -0700 ---------------------------------------------------------------------- build.gradle | 1 + src/main/assembly/src.xml | 5 + .../config/wikipedia-application.properties | 71 +++++++ src/main/config/wikipedia-parser.properties | 6 - src/main/config/wikipedia-stats.properties | 6 + .../application/WikipediaApplication.java | 188 +++++++++++++++++++ .../wikipedia/model/WikipediaParser.java | 80 ++++++++ .../task/WikipediaParserStreamTask.java | 57 +----- .../task/WikipediaStatsStreamTask.java | 24 ++- src/main/resources/log4j.xml | 9 +- 10 files changed, 376 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 40505ce..ec451d5 100644 --- a/build.gradle +++ b/build.gradle @@ -71,6 +71,7 @@ task distTar(dependsOn: build, type: Tar) { include "wikipedia-feed.properties" include "wikipedia-parser.properties" include "wikipedia-stats.properties" + include "wikipedia-application.properties" // expand the Maven tokens with Gradle equivalents. Also change 'target' (Maven) to 'build/distributions' (Gradle) filter { String line -> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml index e280a9a..ca90ebf 100644 --- a/src/main/assembly/src.xml +++ b/src/main/assembly/src.xml @@ -52,6 +52,11 @@ <filtered>true</filtered> </file> <file> + <source>${basedir}/src/main/config/wikipedia-application.properties</source> + <outputDirectory>config</outputDirectory> + <filtered>true</filtered> + </file> + <file> <source>${basedir}/src/main/config/tumbling-pageview-counter.properties</source> <outputDirectory>config</outputDirectory> <filtered>true</filtered> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/config/wikipedia-application.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-application.properties b/src/main/config/wikipedia-application.properties new file mode 100644 index 0000000..59a124f --- /dev/null +++ b/src/main/config/wikipedia-application.properties @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=wikipedia-application + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz + +# Task/Application +app.runner.class=org.apache.samza.runtime.RemoteApplicationRunner +app.class=samza.examples.wikipedia.application.WikipediaApplication +task.window.ms=10000 + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory +serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory + +# Wikipedia System +systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory +systems.wikipedia.host=irc.wikimedia.org +systems.wikipedia.port=6667 + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.bootstrap.servers=localhost:9092 +systems.kafka.default.stream.replication.factor=1 +systems.kafka.default.stream.samza.msg.serde=json + +# Streams which are not on default system or have special characters in the physical name. +streams.en-wikipedia.samza.system=wikipedia +streams.en-wikipedia.samza.physical.name=#en.wikipedia + +streams.en-wiktionary.samza.system=wikipedia +streams.en-wiktionary.samza.physical.name=#en.wiktionary + +streams.en-wikinews.samza.system=wikipedia +streams.en-wikinews.samza.physical.name=#en.wikinews + +# Key-value storage +stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog +stores.wikipedia-stats.key.serde=string +stores.wikipedia-stats.msg.serde=integer + +# Defaults +job.default.system=kafka + +# Metrics +metrics.reporters=snapshot,jmx +metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory +metrics.reporter.snapshot.stream=kafka.metrics +metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory + http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/config/wikipedia-parser.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-parser.properties b/src/main/config/wikipedia-parser.properties index 6d1e3df..e8f3fa0 100644 --- a/src/main/config/wikipedia-parser.properties +++ b/src/main/config/wikipedia-parser.properties @@ -26,12 +26,6 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}- task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask task.inputs=kafka.wikipedia-raw -# Metrics -metrics.reporters=snapshot,jmx -metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory -metrics.reporter.snapshot.stream=kafka.metrics -metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory - # Serializers serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/config/wikipedia-stats.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-stats.properties b/src/main/config/wikipedia-stats.properties index f6b85bf..0a1cf31 100644 --- a/src/main/config/wikipedia-stats.properties +++ b/src/main/config/wikipedia-stats.properties @@ -27,6 +27,12 @@ task.class=samza.examples.wikipedia.task.WikipediaStatsStreamTask task.inputs=kafka.wikipedia-edits task.window.ms=10000 +# Metrics +metrics.reporters=snapshot,jmx +metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory +metrics.reporter.snapshot.stream=kafka.metrics +metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory + # Serializers serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java new file mode 100644 index 0000000..b0779db --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.application; + +import com.google.common.collect.ImmutableList; +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.Counter; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.task.TaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import samza.examples.wikipedia.model.WikipediaParser; +import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; + + +/** + * This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as + * {@link samza.examples.wikipedia.task.WikipediaFeedStreamTask}, + * {@link samza.examples.wikipedia.task.WikipediaParserStreamTask}, and + * {@link samza.examples.wikipedia.task.WikipediaStatsStreamTask} in one expression. + * + * The only functional difference is the lack of "wikipedia-raw" and "wikipedia-edits" + * streams to connect the operators, as they are not needed with the fluent API. + * + * The application processes Wikipedia events in the following steps: + * <ul> + * <li>Merge wikipedia, wiktionary, and wikinews events into one stream</li> + * <li>Parse each event to a more structured format</li> + * <li>Aggregate some stats over a 10s window</li> + * <li>Format each window output for public consumption</li> + * <li>Send the window output to Kafka</li> + * </ul> + * + * All of this application logic is defined in the {@link #init(StreamGraph, Config)} method, which + * is invoked by the framework to load the application. + */ +public class WikipediaApplication implements StreamApplication { + private static final Logger log = LoggerFactory.getLogger(WikipediaApplication.class); + + private static final String STATS_STORE_NAME = "wikipedia-stats"; + private static final String EDIT_COUNT_KEY = "count-edits-all-time"; + + private static final String WIKIPEDIA_STREAM_ID = "en-wikipedia"; + private static final String WIKTIONARY_STREAM_ID = "en-wiktionary"; + private static final String WIKINEWS_STREAM_ID = "en-wikinews"; + + private static final String STATS_STREAM_ID = "wikipedia-stats"; + + @Override + public void init(StreamGraph graph, Config config) { + // Inputs + // Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent + // They are un-keyed, so the 'k' parameter to the msgBuilder is not used + MessageStream<WikipediaFeedEvent> wikipediaEvents = graph.getInputStream(WIKIPEDIA_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v); + MessageStream<WikipediaFeedEvent> wiktionaryEvents = graph.getInputStream(WIKTIONARY_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v); + MessageStream<WikipediaFeedEvent> wikiNewsEvents = graph.getInputStream(WIKINEWS_STREAM_ID, (k, v) -> (WikipediaFeedEvent) v); + + // Output (also un-keyed, so no keyExtractor) + OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m); + + // Merge inputs + MessageStream<WikipediaFeedEvent> allWikipediaEvents = wikipediaEvents.merge(ImmutableList.of(wiktionaryEvents, wikiNewsEvents)); + + // Parse, update stats, prepare output, and send + allWikipediaEvents.map(WikipediaParser::parseEvent) + .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator())) + .map(this::formatOutput) + .sendTo(wikipediaStats); + } + + /** + * A few statistics about the incoming messages. + */ + private static class WikipediaStats { + // Windowed stats + int edits = 0; + int byteDiff = 0; + Set<String> titles = new HashSet<String>(); + Map<String, Integer> counts = new HashMap<String, Integer>(); + + // Total stats + int totalEdits = 0; + + @Override + public String toString() { + return String.format("Stats {edits:%d, byteDiff:%d, titles:%s, counts:%s}", edits, byteDiff, titles, counts); + } + } + + /** + * Updates the windowed and total stats based on each "edit" event. + * + * Uses a KeyValueStore to persist a total edit count across restarts. + */ + private class WikipediaStatsAggregator implements FoldLeftFunction<Map<String, Object>, WikipediaStats> { + + private KeyValueStore<String, Integer> store; + + // Example metric. Running counter of the number of repeat edits of the same title within a single window. + private Counter repeatEdits; + + /** + * {@inheritDoc} + * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Config, TaskContext)} to + * get a KeyValueStore for persistence and the MetricsRegistry for metrics. + */ + @Override + public void init(Config config, TaskContext context) { + store = (KeyValueStore<String, Integer>) context.getStore(STATS_STORE_NAME); + repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits"); + } + + @Override + public WikipediaStats apply(Map<String, Object> edit, WikipediaStats stats) { + + // Update persisted total + Integer editsAllTime = store.get(EDIT_COUNT_KEY); + if (editsAllTime == null) editsAllTime = 0; + editsAllTime++; + store.put(EDIT_COUNT_KEY, editsAllTime); + + // Update window stats + stats.edits++; + stats.totalEdits = editsAllTime; + stats.byteDiff += (Integer) edit.get("diff-bytes"); + boolean newTitle = stats.titles.add((String) edit.get("title")); + + Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags"); + for (Map.Entry<String, Boolean> flag : flags.entrySet()) { + if (Boolean.TRUE.equals(flag.getValue())) { + stats.counts.compute(flag.getKey(), (k, v) -> v == null ? 0 : v + 1); + } + } + + if (!newTitle) { + repeatEdits.inc(); + log.info("Frequent edits for title: {}", edit.get("title")); + } + return stats; + } + } + + /** + * Format the stats for output to Kafka. + */ + private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) { + + WikipediaStats stats = statsWindowPane.getMessage(); + + Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts); + counts.put("edits", stats.edits); + counts.put("edits-all-time", stats.totalEdits); + counts.put("bytes-added", stats.byteDiff); + counts.put("unique-titles", stats.titles.size()); + + return counts; + } +} + http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java b/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java new file mode 100644 index 0000000..9347962 --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.model; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import samza.examples.wikipedia.system.WikipediaFeed; + + +public class WikipediaParser { + public static Map<String, Object> parseEvent(WikipediaFeed.WikipediaFeedEvent wikipediaFeedEvent) { + Map<String, Object> parsedJsonObject = null; + try { + parsedJsonObject = WikipediaParser.parseLine(wikipediaFeedEvent.getRawEvent()); + + parsedJsonObject.put("channel", wikipediaFeedEvent.getChannel()); + parsedJsonObject.put("source", wikipediaFeedEvent.getSource()); + parsedJsonObject.put("time", wikipediaFeedEvent.getTime()); + } catch (Exception e) { + System.err.println("Unable to parse line: " + wikipediaFeedEvent); + } + return parsedJsonObject; + } + + public static Map<String, Object> parseLine(String line) { + Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)"); + Matcher m = p.matcher(line); + + if (m.find() && m.groupCount() == 6) { + String title = m.group(1); + String flags = m.group(2); + String diffUrl = m.group(3); + String user = m.group(4); + int byteDiff = Integer.parseInt(m.group(5)); + String summary = m.group(6); + + Map<String, Boolean> flagMap = new HashMap<String, Boolean>(); + + flagMap.put("is-minor", flags.contains("M")); + flagMap.put("is-new", flags.contains("N")); + flagMap.put("is-unpatrolled", flags.contains("!")); + flagMap.put("is-bot-edit", flags.contains("B")); + flagMap.put("is-special", title.startsWith("Special:")); + flagMap.put("is-talk", title.startsWith("Talk:")); + + Map<String, Object> root = new HashMap<String, Object>(); + + root.put("title", title); + root.put("user", user); + root.put("unparsed-flags", flags); + root.put("diff-bytes", byteDiff); + root.put("diff-url", diffUrl); + root.put("summary", summary); + root.put("flags", flagMap); + + return root; + } else { + throw new IllegalArgumentException(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java index 0505f58..aee8939 100644 --- a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java +++ b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java @@ -19,72 +19,29 @@ package samza.examples.wikipedia.task; -import java.util.HashMap; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskCoordinator; +import samza.examples.wikipedia.model.WikipediaParser; import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; public class WikipediaParserStreamTask implements StreamTask { + private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-edits"); + @SuppressWarnings("unchecked") @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage(); WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); - try { - Map<String, Object> parsedJsonObject = parse(event.getRawEvent()); - - parsedJsonObject.put("channel", event.getChannel()); - parsedJsonObject.put("source", event.getSource()); - parsedJsonObject.put("time", event.getTime()); - - collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject)); - } catch (Exception e) { - System.err.println("Unable to parse line: " + event); - } - } - - public static Map<String, Object> parse(String line) { - Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)"); - Matcher m = p.matcher(line); - - if (m.find() && m.groupCount() == 6) { - String title = m.group(1); - String flags = m.group(2); - String diffUrl = m.group(3); - String user = m.group(4); - int byteDiff = Integer.parseInt(m.group(5)); - String summary = m.group(6); - - Map<String, Boolean> flagMap = new HashMap<String, Boolean>(); - - flagMap.put("is-minor", flags.contains("M")); - flagMap.put("is-new", flags.contains("N")); - flagMap.put("is-unpatrolled", flags.contains("!")); - flagMap.put("is-bot-edit", flags.contains("B")); - flagMap.put("is-special", title.startsWith("Special:")); - flagMap.put("is-talk", title.startsWith("Talk:")); - - Map<String, Object> root = new HashMap<String, Object>(); - - root.put("title", title); - root.put("user", user); - root.put("unparsed-flags", flags); - root.put("diff-bytes", byteDiff); - root.put("diff-url", diffUrl); - root.put("summary", summary); - root.put("flags", flagMap); + Map<String, Object> parsedJsonObject = WikipediaParser.parseEvent(event); - return root; - } else { - throw new IllegalArgumentException(); + if (parsedJsonObject != null) { + collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, parsedJsonObject)); } } @@ -92,7 +49,7 @@ public class WikipediaParserStreamTask implements StreamTask { String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]] http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */ Added to note regarding David Shepard's brothers" }; for (String line : lines) { - System.out.println(parse(line)); + System.out.println(WikipediaParser.parseLine(line)); } } } http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java index 60fd93d..abe760a 100644 --- a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java +++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.samza.config.Config; +import org.apache.samza.metrics.Counter; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -36,14 +37,20 @@ import org.apache.samza.task.TaskCoordinator; import org.apache.samza.task.WindowableTask; public class WikipediaStatsStreamTask implements StreamTask, InitableTask, WindowableTask { + private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-stats"); + private int edits = 0; private int byteDiff = 0; private Set<String> titles = new HashSet<String>(); private Map<String, Integer> counts = new HashMap<String, Integer>(); private KeyValueStore<String, Integer> store; + // Example metric. Running counter of the number of repeat edits of the same title within a single window. + private Counter repeatEdits; + public void init(Config config, TaskContext context) { this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats"); + this.repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits"); } @SuppressWarnings("unchecked") @@ -57,21 +64,18 @@ public class WikipediaStatsStreamTask implements StreamTask, InitableTask, Windo store.put("count-edits-all-time", editsAllTime + 1); edits += 1; - titles.add((String) edit.get("title")); byteDiff += (Integer) edit.get("diff-bytes"); + boolean newTitle = titles.add((String) edit.get("title")); for (Map.Entry<String, Boolean> flag : flags.entrySet()) { if (Boolean.TRUE.equals(flag.getValue())) { - Integer count = counts.get(flag.getKey()); - - if (count == null) { - count = 0; - } - - count += 1; - counts.put(flag.getKey(), count); + counts.compute(flag.getKey(), (k, v) -> v == null ? 0 : v + 1); } } + + if (!newTitle) { + repeatEdits.inc(); + } } @Override @@ -81,7 +85,7 @@ public class WikipediaStatsStreamTask implements StreamTask, InitableTask, Windo counts.put("unique-titles", titles.size()); counts.put("edits-all-time", store.get("count-edits-all-time")); - collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts)); + collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, counts)); // Reset counts after windowing. edits = 0; http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/3d0e919e/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml index 086d6b8..805d5ca 100644 --- a/src/main/resources/log4j.xml +++ b/src/main/resources/log4j.xml @@ -40,14 +40,13 @@ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" /> </layout> </appender> + <logger name="STARTUP_LOGGER" additivity="false"> + <level value="info" /> + <appender-ref ref="StartupAppender"/> + </logger> <root> <priority value="info" /> <appender-ref ref="RollingAppender"/> <appender-ref ref="jmx" /> </root> - <logger name="STARTUP_LOGGER" additivity="false"> - <level value="info" /> - <appender-ref ref="StartupAppender"/> - </logger> - </log4j:configuration>
