Repository: incubator-samza-hello-samza Updated Branches: refs/heads/latest 3a402e640 -> 1029da912 (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/1029da91/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java new file mode 100644 index 0000000..07cd8ac --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.task; + +import java.util.Map; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskCoordinator; +import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; + +/** + * This task is very simple. All it does is take messages that it receives, and + * sends them to a Kafka topic called wikipedia-raw. + */ +public class WikipediaFeedStreamTask implements StreamTask { + private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-raw"); + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { + Map<String, Object> outgoingMap = WikipediaFeedEvent.toMap((WikipediaFeedEvent) envelope.getMessage()); + collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/1029da91/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java new file mode 100644 index 0000000..0505f58 --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.task; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskCoordinator; +import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; + +public class WikipediaParserStreamTask implements StreamTask { + @SuppressWarnings("unchecked") + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { + Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage(); + WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); + + try { + Map<String, Object> parsedJsonObject = parse(event.getRawEvent()); + + parsedJsonObject.put("channel", event.getChannel()); + parsedJsonObject.put("source", event.getSource()); + parsedJsonObject.put("time", event.getTime()); + + collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject)); + } catch (Exception e) { + System.err.println("Unable to parse line: " + event); + } + } + + public static Map<String, Object> parse(String line) { + Pattern p = Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)"); + Matcher m = p.matcher(line); + + if (m.find() && m.groupCount() == 6) { + String title = m.group(1); + String flags = m.group(2); + String diffUrl = m.group(3); + String user = m.group(4); + int byteDiff = Integer.parseInt(m.group(5)); + String summary = m.group(6); + + Map<String, Boolean> flagMap = new HashMap<String, Boolean>(); + + flagMap.put("is-minor", flags.contains("M")); + flagMap.put("is-new", flags.contains("N")); + flagMap.put("is-unpatrolled", flags.contains("!")); + flagMap.put("is-bot-edit", flags.contains("B")); + flagMap.put("is-special", title.startsWith("Special:")); + flagMap.put("is-talk", title.startsWith("Talk:")); + + Map<String, Object> root = new HashMap<String, Object>(); + + root.put("title", title); + root.put("user", user); + root.put("unparsed-flags", flags); + root.put("diff-bytes", byteDiff); + root.put("diff-url", diffUrl); + root.put("summary", summary); + root.put("flags", flagMap); + + return root; + } else { + throw new IllegalArgumentException(); + } + } + + public static void main(String[] args) { + String[] lines = new String[] { "[[Wikipedia talk:Articles for creation/Lords of War]] http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David Shepard (surgeon)]] M http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * Jacobsievers * (+115) /* American Revolution (1775�1783) */ Added to note regarding David Shepard's brothers" }; + + for (String line : lines) { + System.out.println(parse(line)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/1029da91/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java new file mode 100644 index 0000000..60fd93d --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.task; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.samza.config.Config; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.InitableTask; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.WindowableTask; + +public class WikipediaStatsStreamTask implements StreamTask, InitableTask, WindowableTask { + private int edits = 0; + private int byteDiff = 0; + private Set<String> titles = new HashSet<String>(); + private Map<String, Integer> counts = new HashMap<String, Integer>(); + private KeyValueStore<String, Integer> store; + + public void init(Config config, TaskContext context) { + this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats"); + } + + @SuppressWarnings("unchecked") + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { + Map<String, Object> edit = (Map<String, Object>) envelope.getMessage(); + Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags"); + + Integer editsAllTime = store.get("count-edits-all-time"); + if (editsAllTime == null) editsAllTime = 0; + store.put("count-edits-all-time", editsAllTime + 1); + + edits += 1; + titles.add((String) edit.get("title")); + byteDiff += (Integer) edit.get("diff-bytes"); + + for (Map.Entry<String, Boolean> flag : flags.entrySet()) { + if (Boolean.TRUE.equals(flag.getValue())) { + Integer count = counts.get(flag.getKey()); + + if (count == null) { + count = 0; + } + + count += 1; + counts.put(flag.getKey(), count); + } + } + } + + @Override + public void window(MessageCollector collector, TaskCoordinator coordinator) { + counts.put("edits", edits); + counts.put("bytes-added", byteDiff); + counts.put("unique-titles", titles.size()); + counts.put("edits-all-time", store.get("count-edits-all-time")); + + collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts)); + + // Reset counts after windowing. + edits = 0; + byteDiff = 0; + titles = new HashSet<String>(); + counts = new HashMap<String, Integer>(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza-hello-samza/blob/1029da91/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml new file mode 100644 index 0000000..f0de765 --- /dev/null +++ b/src/main/resources/log4j.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" /> + + <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender"> + <param name="File" value="${samza.log.dir}/${samza.container.name}.log" /> + <param name="DatePattern" value="'.'yyyy-MM-dd" /> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" /> + </layout> + </appender> + <root> + <priority value="info" /> + <appender-ref ref="RollingAppender"/> + <appender-ref ref="jmx" /> + </root> +</log4j:configuration>
