Repository: samza
Updated Branches:
  refs/heads/master 440a25c97 -> e0ff4c523


SAMZA-1733: Populating ListGauge metric using DiagnosticsAppender for exceptions

This PR shows how the ListGauge can be used to emit exceptions using a 
DiagnosticsAppender.
1. DiagnosticsAppender is enabled using a config (diagnostics.appender.enable)
2. DiagnosticsAppender adds exception-events to a listgauge which is a samza 
container metric
2. This ListGauge uses a time-and-count based eviction policy, so that 
exception-events are not emitted to Kafka(SnapshotReporter) forever.

Author: Ray Matharu <[email protected]>

Reviewers: Yi Pan <[email protected]>, Jagadish Venkatraman 
<[email protected]>, Shanthoosh Venkatraman <[email protected]>

Closes #543 from rayman7718/diagnosticsappender


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e0ff4c52
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e0ff4c52
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e0ff4c52

Branch: refs/heads/master
Commit: e0ff4c523e683ff70f9a899725e865062383a8ca
Parents: 440a25c
Author: Ray Matharu <[email protected]>
Authored: Fri Jul 27 11:53:41 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Fri Jul 27 11:53:41 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/JobConfig.scala     |  13 +++
 .../apache/samza/container/SamzaContainer.scala |  23 ++++-
 .../samza/container/SamzaContainerMetrics.scala |   3 +-
 .../diagnostics/DiagnosticsExceptionEvent.java  |  86 ++++++++++++++++
 .../apache/samza/metrics/reporter/Metrics.scala |   9 +-
 .../serializers/MetricsSnapshotSerdeV2.java     |  75 ++++++++++++++
 .../MetricsSnapshotSerdeV2Factory.java          |  31 ++++++
 .../serializers/TestMetricsSnapshotSerdeV2.java |  68 +++++++++++++
 .../log4j/SimpleDiagnosticsAppender.java        | 101 +++++++++++++++++++
 9 files changed, 403 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e0ff4c52/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 75e8005..7cebcc6 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -88,6 +88,13 @@ object JobConfig {
   // across application restarts
   val JOB_LOGGED_STORE_BASE_DIR = "job.logged.store.base.dir"
 
+  // Enables diagnostic appender for logging exception events
+  val JOB_DIAGNOSTICS_ENABLED = "job.diagnostics.enabled"
+
+  // Specify DiagnosticAppender class
+  val DIAGNOSTICS_APPENDER_CLASS = "job.diagnostics.appender.class"
+  val DEFAULT_DIAGNOSTICS_APPENDER_CLASS = 
"org.apache.samza.logging.log4j.SimpleDiagnosticsAppender"
+
   implicit def Config2Job(config: Config) = new JobConfig(config)
 
   /**
@@ -186,4 +193,10 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   def getNonLoggedStorePath = 
getOption(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR)
 
   def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR)
+
+  def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, 
false) }
+
+  def getDiagnosticsAppenderClass = {
+    getOrDefault(JobConfig.DIAGNOSTICS_APPENDER_CLASS, 
JobConfig.DEFAULT_DIAGNOSTICS_APPENDER_CLASS)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e0ff4c52/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index bb1b1cf..47b73c1 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -21,6 +21,7 @@ package org.apache.samza.container
 
 import java.io.File
 import java.lang.management.ManagementFactory
+import java.lang.reflect.InvocationTargetException
 import java.net.{URL, UnknownHostException}
 import java.nio.file.Path
 import java.time.Duration
@@ -53,8 +54,7 @@ import org.apache.samza.system.chooser.{DefaultChooser, 
MessageChooserFactory, R
 import org.apache.samza.table.TableManager
 import org.apache.samza.table.utils.SerdeUtils
 import org.apache.samza.task._
-import org.apache.samza.util.Util
-import org.apache.samza.util._
+import org.apache.samza.util.{Util, _}
 import org.apache.samza.{SamzaContainerStatus, SamzaException}
 
 import scala.collection.JavaConverters._
@@ -798,6 +798,7 @@ class SamzaContainer(
       jmxServer = new JmxServer()
 
       startMetrics
+      startDiagnostics
       startAdmins
       startOffsetManager
       startLocalityManager
@@ -934,6 +935,24 @@ class SamzaContainer(
     })
   }
 
+  def startDiagnostics {
+    if (containerContext.config.getDiagnosticsEnabled) {
+      info("Starting diagnostics.")
+
+      try {
+        val diagnosticsAppender = 
Class.forName(containerContext.config.getDiagnosticsAppenderClass).
+          
getDeclaredConstructor(classOf[SamzaContainerMetrics]).newInstance(this.metrics);
+      }
+      catch {
+        case e@(_: ClassNotFoundException | _: InstantiationException | _: 
InvocationTargetException) => {
+          error("Failed to instantiate diagnostic appender", e)
+          throw new ConfigException("Failed to instantiate diagnostic appender 
class " +
+            containerContext.config.getDiagnosticsAppenderClass, e)
+        }
+      }
+    }
+  }
+
   def startOffsetManager {
     info("Registering task instances with offsets.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e0ff4c52/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index a26e666..d5cf6c6 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -21,6 +21,7 @@ package org.apache.samza.container
 
 import java.util
 
+import org.apache.samza.diagnostics.DiagnosticsExceptionEvent
 import org.apache.samza.metrics.{Gauge, ReadableMetricsRegistry, 
MetricsRegistryMap, MetricsHelper}
 
 class SamzaContainerMetrics(
@@ -48,7 +49,7 @@ class SamzaContainerMetrics(
 
   val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new 
util.HashMap[TaskName, Gauge[Long]]()
 
-  val exceptions = newListGauge[String]("exceptions")
+  val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")
 
   def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
     taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" 
format(taskName.toString, storeName), -1L))

http://git-wip-us.apache.org/repos/asf/samza/blob/e0ff4c52/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
new file mode 100644
index 0000000..d87249e
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+
+/**
+ * This class encapsulates information related to an exception event that is 
useful for diagnostics.
+ * It used to define container, task, and other metrics as
+ * {@link org.apache.samza.metrics.ListGauge} of type {@link 
DiagnosticsExceptionEvent}.
+ */
+public class DiagnosticsExceptionEvent {
+
+  private long timestamp; // the timestamp associated with this exception
+  private Class exceptionType; // store the exception type separately
+  private Throwable throwable;
+  private Map mdcMap;
+  // the MDC map associated with this exception, used to store/obtain any 
context associated with the throwable
+
+  public DiagnosticsExceptionEvent() {
+  }
+
+  public DiagnosticsExceptionEvent(long timestampMillis, Throwable throwable, 
Map mdcMap) {
+    this.throwable = throwable;
+    this.exceptionType = throwable.getClass();
+    this.timestamp = timestampMillis;
+    this.mdcMap = new HashMap(mdcMap);
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public Throwable getThrowable() {
+    return this.throwable;
+  }
+
+  public Class getExceptionType() {
+    return this.exceptionType;
+  }
+
+  public Map getMdcMap() {
+    return mdcMap;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DiagnosticsExceptionEvent that = (DiagnosticsExceptionEvent) o;
+
+    // Throwable provides no equals impl, so we assume message & stacktrace 
equality suffices
+    return timestamp == that.timestamp && 
this.exceptionType.equals(that.exceptionType) && mdcMap.equals(that.mdcMap)
+        && this.throwable.getMessage().equals(that.throwable.getMessage()) && 
Arrays.equals(
+        this.throwable.getStackTrace(), that.throwable.getStackTrace());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(timestamp, exceptionType, throwable, mdcMap);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e0ff4c52/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
index 8a58cd2..218157e 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
@@ -19,9 +19,7 @@
 
 package org.apache.samza.metrics.reporter
 
-import java.util.Collections
-import java.util.HashMap
-import java.util.Map
+import java.util.{Collections, HashMap, Map}
 import scala.collection.JavaConverters._
 
 object Metrics {
@@ -52,4 +50,9 @@ class Metrics(metrics: Map[String, Map[String, Object]]) {
   def get(group: String) = immutableMetrics.get(group)
 
   def getAsMap(): Map[String, Map[String, Object]] = 
Collections.unmodifiableMap(immutableMetrics)
+
+  // default constructor to enable deserialization by MetricsSnapshotSerdeV2
+  def this() {
+    this(new HashMap[String, Map[String, Object]]())
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e0ff4c52/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2.java
 
b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2.java
new file mode 100644
index 0000000..6ab7ce8
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MetricsSnapshotSerdeV2 implements Serde<MetricsSnapshot> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetricsSnapshotSerdeV2.class);
+  private final ObjectMapper objectMapper;
+
+  public MetricsSnapshotSerdeV2() {
+    objectMapper = new ObjectMapper();
+    
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE);
+  }
+
+  @Override
+  public MetricsSnapshot fromBytes(byte[] bytes) {
+    try {
+      return MetricsSnapshot.fromMap(
+          objectMapper.readValue(bytes, new HashMap<String, Map<String, 
Object>>().getClass()));
+    } catch (IOException e) {
+      LOG.info("Exception while deserializing", e);
+    }
+    return null;
+  }
+
+  @Override
+  public byte[] toBytes(MetricsSnapshot metricsSnapshot) {
+    try {
+      return 
objectMapper.writeValueAsString(convertMap(metricsSnapshot.getAsMap())).getBytes("UTF-8");
+    } catch (IOException e) {
+      LOG.info("Exception while serializing", e);
+    }
+    return null;
+  }
+
+  /** Metrics returns an UnmodifiableMap.
+   * Unmodifiable maps should not be serialized with type, because 
UnmodifiableMap cannot be deserialized.
+   * So we convert to HashMap.
+   */
+  private HashMap convertMap(Map<String, Object> map) {
+    HashMap retVal = new HashMap(map);
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+      if (entry.getValue() instanceof Map) {
+        retVal.put(entry.getKey(), convertMap((Map<String, Object>) 
entry.getValue()));
+      }
+    }
+    return retVal;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e0ff4c52/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2Factory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2Factory.java
 
b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2Factory.java
new file mode 100644
index 0000000..49e0770
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2Factory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+
+
+public class MetricsSnapshotSerdeV2Factory implements 
SerdeFactory<MetricsSnapshot> {
+  @Override
+  public Serde<MetricsSnapshot> getSerde(String name, Config config) {
+    return new MetricsSnapshotSerdeV2();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e0ff4c52/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
 
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
new file mode 100644
index 0000000..e4255a7
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model.serializers;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
+import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.metrics.reporter.Metrics;
+import org.apache.samza.metrics.reporter.MetricsHeader;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestMetricsSnapshotSerdeV2 {
+
+  @Test
+  public void testSerde() {
+    MetricsHeader metricsHeader =
+        new MetricsHeader("jobName", "i001", "container 0", "source", 
"300.14.25.1", "1", "1", 1, 1);
+
+    ListGauge listGauge = new 
ListGauge<DiagnosticsExceptionEvent>("exceptions");
+    DiagnosticsExceptionEvent diagnosticsExceptionEvent1 =
+        new DiagnosticsExceptionEvent(1, new SamzaException("this is a samza 
exception", new RuntimeException("cause")),
+            new HashMap());
+
+    listGauge.add(diagnosticsExceptionEvent1);
+
+    String samzaContainerMetricsGroupName = 
"org.apache.samza.container.SamzaContainerMetrics";
+    Map<String, Map<String, Object>> metricMessage = new HashMap<>();
+    metricMessage.put(samzaContainerMetricsGroupName, new HashMap<>());
+    metricMessage.get(samzaContainerMetricsGroupName).put("exceptions", 
listGauge.getValues());
+    metricMessage.get(samzaContainerMetricsGroupName).put("commit-calls", 0);
+
+    MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new 
Metrics(metricMessage));
+
+    MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
+    byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot);
+
+    MetricsSnapshot deserializedMetricsSnapshot = 
metricsSnapshotSerde.fromBytes(serializedBytes);
+
+    Assert.assertTrue("Headers map should be equal",
+        
metricsSnapshot.getHeader().getAsMap().equals(deserializedMetricsSnapshot.getHeader().getAsMap()));
+
+    Assert.assertTrue("Metrics map should be equal",
+        
metricsSnapshot.getMetrics().getAsMap().equals(deserializedMetricsSnapshot.getMetrics().getAsMap()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e0ff4c52/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
new file mode 100644
index 0000000..31f0d47
--- /dev/null
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
+import org.apache.samza.metrics.ListGauge;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provides an in-memory appender that parses LoggingEvents to filter events 
relevant to diagnostics.
+ * Currently, filters exception related events and update an exception metric 
({@link ListGauge}) in
+ * {@link SamzaContainerMetrics}.
+ *
+ * When used inconjunction with {@link 
org.apache.samza.metrics.reporter.MetricsSnapshotReporter} provides a
+ * stream of diagnostics-related events.
+ */
+public class SimpleDiagnosticsAppender extends AppenderSkeleton {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SimpleDiagnosticsAppender.class);
+
+  // simple object to synchronize root logger attachment
+  private static final Object SYNCHRONIZATION_OBJECT = new Object();
+  protected final ListGauge<DiagnosticsExceptionEvent> 
samzaContainerExceptionMetric;
+
+  /**
+   * A simple log4j1.2.* appender, which attaches itself to the root logger.
+   * Attachment to the root logger is thread safe.
+   */
+  public SimpleDiagnosticsAppender(SamzaContainerMetrics 
samzaContainerMetrics) {
+    this.samzaContainerExceptionMetric = samzaContainerMetrics.exceptions();
+    this.setName(SimpleDiagnosticsAppender.class.getName());
+
+    synchronized (SYNCHRONIZATION_OBJECT) {
+      this.attachAppenderToRootLogger();
+    }
+  }
+
+  private void attachAppenderToRootLogger() {
+    // ensure appender is attached only once per JVM (regardless of 
#containers)
+    if 
(org.apache.log4j.Logger.getRootLogger().getAppender(SimpleDiagnosticsAppender.class.getName())
 == null) {
+      LOG.info("Attaching diagnostics appender to root logger");
+      org.apache.log4j.Logger.getRootLogger().addAppender(this);
+    }
+  }
+
+  @Override
+  protected void append(LoggingEvent loggingEvent) {
+
+    try {
+      // if an event with a non-null throwable is received => exception event
+      if (loggingEvent.getThrowableInformation() != null) {
+        DiagnosticsExceptionEvent diagnosticsExceptionEvent =
+            new DiagnosticsExceptionEvent(loggingEvent.timeStamp, 
loggingEvent.getThrowableInformation().getThrowable(),
+                loggingEvent.getProperties());
+
+        samzaContainerExceptionMetric.add(diagnosticsExceptionEvent);
+        LOG.debug("Received DiagnosticsExceptionEvent " + 
diagnosticsExceptionEvent);
+      } else {
+        LOG.debug("Received non-exception event with message " + 
loggingEvent.getMessage());
+      }
+    } catch (Exception e) {
+      // blanket catch of all exceptions so as to not impact any job
+      LOG.error("Exception in logging event parsing", e);
+    }
+  }
+
+  @Override
+  public void close() {
+    // Do nothing.
+  }
+
+  /**
+   * Returns false since this appender requires no layout.
+   * @return false
+   */
+  @Override
+  public boolean requiresLayout() {
+    return false;
+  }
+}

Reply via email to