Repository: samza-hello-samza
Updated Branches:
  refs/heads/master 4384cb034 -> 1a34a83d1


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java 
b/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java
new file mode 100644
index 0000000..9347962
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/model/WikipediaParser.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.model;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import samza.examples.wikipedia.system.WikipediaFeed;
+
+
+public class WikipediaParser {
+  public static Map<String, Object> 
parseEvent(WikipediaFeed.WikipediaFeedEvent wikipediaFeedEvent) {
+    Map<String, Object> parsedJsonObject = null;
+    try {
+      parsedJsonObject = 
WikipediaParser.parseLine(wikipediaFeedEvent.getRawEvent());
+
+      parsedJsonObject.put("channel", wikipediaFeedEvent.getChannel());
+      parsedJsonObject.put("source", wikipediaFeedEvent.getSource());
+      parsedJsonObject.put("time", wikipediaFeedEvent.getTime());
+    } catch (Exception e) {
+      System.err.println("Unable to parse line: " + wikipediaFeedEvent);
+    }
+    return parsedJsonObject;
+  }
+
+  public static Map<String, Object> parseLine(String line) {
+    Pattern p = 
Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
+    Matcher m = p.matcher(line);
+
+    if (m.find() && m.groupCount() == 6) {
+      String title = m.group(1);
+      String flags = m.group(2);
+      String diffUrl = m.group(3);
+      String user = m.group(4);
+      int byteDiff = Integer.parseInt(m.group(5));
+      String summary = m.group(6);
+
+      Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
+
+      flagMap.put("is-minor", flags.contains("M"));
+      flagMap.put("is-new", flags.contains("N"));
+      flagMap.put("is-unpatrolled", flags.contains("!"));
+      flagMap.put("is-bot-edit", flags.contains("B"));
+      flagMap.put("is-special", title.startsWith("Special:"));
+      flagMap.put("is-talk", title.startsWith("Talk:"));
+
+      Map<String, Object> root = new HashMap<String, Object>();
+
+      root.put("title", title);
+      root.put("user", user);
+      root.put("unparsed-flags", flags);
+      root.put("diff-bytes", byteDiff);
+      root.put("diff-url", diffUrl);
+      root.put("summary", summary);
+      root.put("flags", flagMap);
+
+      return root;
+    } else {
+      throw new IllegalArgumentException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java 
b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
index 0505f58..aee8939 100644
--- a/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
@@ -19,72 +19,29 @@
 
 package samza.examples.wikipedia.task;
 
-import java.util.HashMap;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.TaskCoordinator;
+import samza.examples.wikipedia.model.WikipediaParser;
 import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
 
 public class WikipediaParserStreamTask implements StreamTask {
+  private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", 
"wikipedia-edits");
+
   @SuppressWarnings("unchecked")
   @Override
   public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
     Map<String, Object> jsonObject = (Map<String, Object>) 
envelope.getMessage();
     WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
 
-    try {
-      Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
-
-      parsedJsonObject.put("channel", event.getChannel());
-      parsedJsonObject.put("source", event.getSource());
-      parsedJsonObject.put("time", event.getTime());
-
-      collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
"wikipedia-edits"), parsedJsonObject));
-    } catch (Exception e) {
-      System.err.println("Unable to parse line: " + event);
-    }
-  }
-
-  public static Map<String, Object> parse(String line) {
-    Pattern p = 
Pattern.compile("\\[\\[(.*)\\]\\]\\s(.*)\\s(.*)\\s\\*\\s(.*)\\s\\*\\s\\(\\+?(.\\d*)\\)\\s(.*)");
-    Matcher m = p.matcher(line);
-
-    if (m.find() && m.groupCount() == 6) {
-      String title = m.group(1);
-      String flags = m.group(2);
-      String diffUrl = m.group(3);
-      String user = m.group(4);
-      int byteDiff = Integer.parseInt(m.group(5));
-      String summary = m.group(6);
-
-      Map<String, Boolean> flagMap = new HashMap<String, Boolean>();
-
-      flagMap.put("is-minor", flags.contains("M"));
-      flagMap.put("is-new", flags.contains("N"));
-      flagMap.put("is-unpatrolled", flags.contains("!"));
-      flagMap.put("is-bot-edit", flags.contains("B"));
-      flagMap.put("is-special", title.startsWith("Special:"));
-      flagMap.put("is-talk", title.startsWith("Talk:"));
-
-      Map<String, Object> root = new HashMap<String, Object>();
-
-      root.put("title", title);
-      root.put("user", user);
-      root.put("unparsed-flags", flags);
-      root.put("diff-bytes", byteDiff);
-      root.put("diff-url", diffUrl);
-      root.put("summary", summary);
-      root.put("flags", flagMap);
+    Map<String, Object> parsedJsonObject = WikipediaParser.parseEvent(event);
 
-      return root;
-    } else {
-      throw new IllegalArgumentException();
+    if (parsedJsonObject != null) {
+      collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, 
parsedJsonObject));
     }
   }
 
@@ -92,7 +49,7 @@ public class WikipediaParserStreamTask implements StreamTask {
     String[] lines = new String[] { "[[Wikipedia talk:Articles for 
creation/Lords of War]]  
http://en.wikipedia.org/w/index.php?diff=562991653&oldid=562991567 * 
BBGLordsofWar * (+95) /* Lords of War: Elves versus Lizardmen */]", "[[David 
Shepard (surgeon)]] M 
http://en.wikipedia.org/w/index.php?diff=562993463&oldid=562989820 * 
Jacobsievers * (+115) /* American Revolution (1775�1783) */  Added to note 
regarding David Shepard's brothers" };
 
     for (String line : lines) {
-      System.out.println(parse(line));
+      System.out.println(WikipediaParser.parseLine(line));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java 
b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
index 60fd93d..abe760a 100644
--- a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.config.Config;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -36,14 +37,20 @@ import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.WindowableTask;
 
 public class WikipediaStatsStreamTask implements StreamTask, InitableTask, 
WindowableTask {
+  private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", 
"wikipedia-stats");
+
   private int edits = 0;
   private int byteDiff = 0;
   private Set<String> titles = new HashSet<String>();
   private Map<String, Integer> counts = new HashMap<String, Integer>();
   private KeyValueStore<String, Integer> store;
 
+  // Example metric. Running counter of the number of repeat edits of the same 
title within a single window.
+  private Counter repeatEdits;
+
   public void init(Config config, TaskContext context) {
     this.store = (KeyValueStore<String, Integer>) 
context.getStore("wikipedia-stats");
+    this.repeatEdits = 
context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
   }
 
   @SuppressWarnings("unchecked")
@@ -57,21 +64,18 @@ public class WikipediaStatsStreamTask implements 
StreamTask, InitableTask, Windo
     store.put("count-edits-all-time", editsAllTime + 1);
 
     edits += 1;
-    titles.add((String) edit.get("title"));
     byteDiff += (Integer) edit.get("diff-bytes");
+    boolean newTitle = titles.add((String) edit.get("title"));
 
     for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
       if (Boolean.TRUE.equals(flag.getValue())) {
-        Integer count = counts.get(flag.getKey());
-
-        if (count == null) {
-          count = 0;
-        }
-
-        count += 1;
-        counts.put(flag.getKey(), count);
+        counts.compute(flag.getKey(), (k, v) -> v == null ? 0 : v + 1);
       }
     }
+
+    if (!newTitle) {
+      repeatEdits.inc();
+    }
   }
 
   @Override
@@ -81,7 +85,7 @@ public class WikipediaStatsStreamTask implements StreamTask, 
InitableTask, Windo
     counts.put("unique-titles", titles.size());
     counts.put("edits-all-time", store.get("count-edits-all-time"));
 
-    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
"wikipedia-stats"), counts));
+    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, counts));
 
     // Reset counts after windowing.
     edits = 0;

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/1a34a83d/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
index f0de765..805d5ca 100644
--- a/src/main/resources/log4j.xml
+++ b/src/main/resources/log4j.xml
@@ -24,13 +24,26 @@
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
   <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" />
 
-  <appender name="RollingAppender" 
class="org.apache.log4j.DailyRollingFileAppender">
+  <appender name="RollingAppender" 
class="org.apache.log4j.RollingFileAppender">
      <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
-     <param name="DatePattern" value="'.'yyyy-MM-dd" />
+     <param name="MaxFileSize" value="256MB" />
+     <param name="MaxBackupIndex" value="20" />
      <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} 
[%p] %m%n" />
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] 
%c{1} [%p] %m%n" />
      </layout>
   </appender>
+  <appender name="StartupAppender" 
class="org.apache.log4j.RollingFileAppender">
+     <param name="File" 
value="${samza.log.dir}/${samza.container.name}-startup.log" />
+     <param name="MaxFileSize" value="256MB" />
+     <param name="MaxBackupIndex" value="1" />
+     <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] 
%c{1} [%p] %m%n" />
+     </layout>
+  </appender>
+  <logger name="STARTUP_LOGGER" additivity="false">
+    <level value="info" />
+    <appender-ref ref="StartupAppender"/>
+  </logger>
   <root>
     <priority value="info" />
     <appender-ref ref="RollingAppender"/>

Reply via email to