This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c4b31e  SAMZA-2569: Add features into StreamAppender (#1403)
7c4b31e is described below

commit 7c4b31eaf197eac6c98107710fbba311b9536fe2
Author: Binyao Jiang <[email protected]>
AuthorDate: Wed Aug 5 13:46:18 2020 -0500

    SAMZA-2569: Add features into StreamAppender (#1403)
    
    API Changes: two new metrics will be introduced in StreamAppender and will 
get reported if metrics reporter is set. This applies to application that uses 
StreamAppender in log4j2.xml
    
    Upgrade/usage instructions:
    If users write new appender or extends existing StreamAppender for own use 
cases, feel free to update their appender code to extend new StreamAppender to 
make code neater.
    If users want to report these collected metrics through metrics reporter, 
add metrics reporter related configs (See MetricsConfig.java for more details)
---
 .../samza/logging/log4j2/StreamAppender.java       | 132 ++++++++++++++-------
 .../logging/log4j2/StreamAppenderMetrics.java      |  14 ++-
 .../samza/logging/log4j2/TestStreamAppender.java   |   3 +-
 3 files changed, 101 insertions(+), 48 deletions(-)

diff --git 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index e033376..9d3e477 100644
--- 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -48,14 +49,15 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import 
org.apache.samza.logging.log4j2.serializers.LoggingEventJsonSerdeFactory;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
@@ -67,6 +69,7 @@ import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.ExponentialSleepStrategy;
 import org.apache.samza.util.HttpUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.ReflectionUtil;
 
 @Plugin(name = "Stream", category = "Core", elementType = "appender", 
printObject = true)
@@ -79,26 +82,30 @@ public class StreamAppender extends AbstractAppender {
   // Hidden config for now. Will move to appropriate Config class when ready 
to.
   private static final String CREATE_STREAM_ENABLED = 
"task.log4j.create.stream.enabled";
 
-  protected static final int DEFAULT_QUEUE_SIZE = 100;
   private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
+  private final BlockingQueue<byte[]> logQueue = new 
LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
 
-  protected static volatile boolean systemInitialized = false;
-
-  private Config config = null;
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
   private String key = null;
-  private String streamName = null;
+  private String containerName = null;
   private int partitionCount = 0;
   private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
   private Logger log = LogManager.getLogger(StreamAppender.class);
-  protected StreamAppenderMetrics metrics;
+  private Thread transferThread;
+  private Config config = null;
+  private String streamName = null;
 
-  private final BlockingQueue<byte[]> logQueue = new 
LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
-  protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
+  /**
+   * used to detect if this thread is called recursively
+   */
+  private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
-  private Thread transferThread;
+  protected static final int DEFAULT_QUEUE_SIZE = 100;
+  protected static volatile boolean systemInitialized = false;
+  protected StreamAppenderMetrics metrics;
+  protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
   protected StreamAppender(String name, Filter filter, Layout<? extends 
Serializable> layout, boolean ignoreExceptions, String streamName) {
     super(name, filter, layout, ignoreExceptions);
@@ -108,7 +115,7 @@ public class StreamAppender extends AbstractAppender {
   @Override
   public void start() {
     super.start();
-    String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
+    containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
     if (containerName != null) {
       isApplicationMaster = containerName.contains(JOB_COORDINATOR_TAG);
     } else {
@@ -127,11 +134,6 @@ public class StreamAppender extends AbstractAppender {
   }
 
   /**
-   * used to detect if this thread is called recursively
-   */
-  private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
-
-  /**
    * Getter for the StreamName parameter. See also {@link 
#createAppender(String, Filter, Layout, boolean, String)} for when this is 
called.
    * Example: {@literal <param name="StreamName" value="ExampleStreamName"/>}
    * @return The configured stream name.
@@ -141,6 +143,16 @@ public class StreamAppender extends AbstractAppender {
   }
 
   /**
+   * Getter for the Config parameter.
+   */
+  protected Config getConfig() {
+    if (config == null) {
+      config = fetchConfig();
+    }
+    return this.config;
+  }
+
+  /**
    * Getter for the number of partitions to create on a new StreamAppender 
stream. See also {@link #createAppender(String, Filter, Layout, boolean, 
String)} for when this is called.
    * Example: {@literal <param name="PartitionCount" value="4"/>}
    * @return The configured partition count of the StreamAppender stream. If 
not set, returns {@link JobConfig#getContainerCount()}.
@@ -188,7 +200,7 @@ public class StreamAppender extends AbstractAppender {
         } else {
           // Serialize the event before adding to the queue to leverage the 
caller thread
           // and ensure that the transferThread can keep up.
-          if (!logQueue.offer(serde.toBytes(subLog(event)), queueTimeoutS, 
TimeUnit.SECONDS)) {
+          if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS, 
TimeUnit.SECONDS)) {
             // Do NOT retry adding system to the queue. Dropping the event 
allows us to alleviate the unlikely
             // possibility of a deadlock, which can arise due to a circular 
dependency between the SystemProducer
             // which is used for StreamAppender and the log, which uses 
StreamAppender. Any locks held in the callstack
@@ -214,7 +226,10 @@ public class StreamAppender extends AbstractAppender {
           metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / 
DEFAULT_QUEUE_SIZE));
         }
       } catch (Exception e) {
-        System.err.println("[StreamAppender] Error sending log message:");
+        if (metrics != null) { // setupSystem() may not have been invoked yet 
so metrics can be null here.
+          metrics.logMessagesErrors.inc();
+        }
+        System.err.println(String.format("[%s] Error sending log message:", 
getName()));
         e.printStackTrace();
       } finally {
         recursiveCall.set(false);
@@ -224,6 +239,10 @@ public class StreamAppender extends AbstractAppender {
     }
   }
 
+  protected byte[] encodeLogEventToBytes(LogEvent event) {
+    return serde.toBytes(subLog(event));
+  }
+
   private Message subAppend(LogEvent event) {
     if (getLayout() == null) {
       return new SimpleMessage(event.getMessage().getFormattedMessage());
@@ -239,7 +258,7 @@ public class StreamAppender extends AbstractAppender {
     }
   }
 
-  private LogEvent subLog(LogEvent event) {
+  protected LogEvent subLog(LogEvent event) {
     return Log4jLogEvent.newBuilder()
         .setLevel(event.getLevel())
         .setLoggerName(event.getLoggerName())
@@ -256,7 +275,7 @@ public class StreamAppender extends AbstractAppender {
 
   @Override
   public void stop() {
-    log.info("Shutting down the StreamAppender...");
+    log.info(String.format("Shutting down the %s...", getName()));
     transferThread.interrupt();
     try {
       transferThread.join();
@@ -285,7 +304,7 @@ public class StreamAppender extends AbstractAppender {
    *
    * @return Config the config of this container
    */
-  protected Config getConfig() {
+  private Config fetchConfig() {
     Config config;
 
     try {
@@ -305,38 +324,58 @@ public class StreamAppender extends AbstractAppender {
     return config;
   }
 
+  protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+    return new Log4jSystemConfig(config);
+  }
+
+  protected StreamAppenderMetrics getMetrics(MetricsRegistryMap 
metricsRegistry) {
+    return new StreamAppenderMetrics(getName(), metricsRegistry);
+  }
+
+  protected void setupStream(SystemFactory systemFactory, String systemName) {
+    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+      // Explicitly create stream appender stream with the partition count the 
same as the number of containers.
+      System.out.println(String.format("[%s] creating stream ", getName()) + 
streamName + " with partition count " + getPartitionCount());
+      StreamSpec streamSpec =
+          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, 
getPartitionCount());
+
+      // SystemAdmin only needed for stream creation here.
+      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+      systemAdmin.start();
+      systemAdmin.createStream(streamSpec);
+      systemAdmin.stop();
+    }
+  }
+
   protected void setupSystem() {
     config = getConfig();
-    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+    Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
 
     if (streamName == null) {
       streamName = getStreamName(log4jSystemConfig.getJobName(), 
log4jSystemConfig.getJobId());
     }
 
-    // TODO we need the ACTUAL metrics registry, or the metrics won't get 
reported by the metric reporters!
-    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+    // Instantiate metrics
+    MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();
+    // Take this.getClass().getName() as the name to make it extend-friendly
+    metrics = getMetrics(metricsRegistry);
+    // Register metrics into metrics reporters so that they are able to be 
reported to other systems
+    Map<String, MetricsReporter>
+        metricsReporters = MetricsReporterLoader.getMetricsReporters(new 
MetricsConfig(config), containerName);
+    metricsReporters.values().forEach(reporter -> {
+      reporter.register(containerName, metricsRegistry);
+      reporter.start();
+    });
 
     String systemName = log4jSystemConfig.getSystemName();
     String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName)
         .orElseThrow(() -> new SamzaException(
-            "Could not figure out \"" + systemName + "\" system factory for 
log4j StreamAppender to use"));
+            "Could not figure out \"" + systemName + "\" system factory for 
log4j " + getName() + " to use"));
     SystemFactory systemFactory = ReflectionUtil.getObj(systemFactoryName, 
SystemFactory.class);
 
-    setSerde(log4jSystemConfig, systemName, streamName);
-
-    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
-      // Explicitly create stream appender stream with the partition count the 
same as the number of containers.
-      System.out.println("[StreamAppender] creating stream " + streamName + " 
with partition count " + getPartitionCount());
-      StreamSpec streamSpec =
-          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, 
getPartitionCount());
+    setSerde(log4jSystemConfig, systemName);
 
-      // SystemAdmin only needed for stream creation here.
-      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
-      systemAdmin.start();
-      systemAdmin.createStream(streamSpec);
-      systemAdmin.stop();
-    }
+    setupStream(systemFactory, systemName);
 
     systemProducer = systemFactory.getProducer(systemName, config, 
metricsRegistry);
     systemStream = new SystemStream(systemName, streamName);
@@ -360,6 +399,9 @@ public class StreamAppender extends AbstractAppender {
           try {
             byte[] serializedLogEvent = logQueue.take();
 
+            metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
+            metrics.logMessagesCountSent.inc();
+
             OutgoingMessageEnvelope outgoingMessageEnvelope =
                 new OutgoingMessageEnvelope(systemStream, keyBytes, 
serializedLogEvent);
             systemProducer.send(SOURCE, outgoingMessageEnvelope);
@@ -368,24 +410,25 @@ public class StreamAppender extends AbstractAppender {
             // Preserve the interrupted status for the loop condition.
             Thread.currentThread().interrupt();
           } catch (Throwable t) {
-            log.error("Error sending StreamAppender event to SystemProducer", 
t);
+            metrics.logMessagesErrors.inc();
+            log.error("Error sending " + getName() + " event to 
SystemProducer", t);
           }
         }
       };
 
       transferThread = new Thread(transferFromQueueToSystem);
       transferThread.setDaemon(true);
-      transferThread.setName("Samza StreamAppender Producer " + 
transferThread.getName());
+      transferThread.setName("Samza " + getName() + " Producer " + 
transferThread.getName());
       transferThread.start();
 
     } catch (UnsupportedEncodingException e) {
       throw new SamzaException(String.format(
-          "Container name: %s could not be encoded to bytes. StreamAppender 
cannot proceed.", key),
+          "Container name: %s could not be encoded to bytes. %s cannot 
proceed.", key, getName()),
           e);
     }
   }
 
-  protected static String getStreamName(String jobName, String jobId) {
+  protected String getStreamName(String jobName, String jobId) {
     if (jobName == null) {
       throw new SamzaException("job name is null. Please specify job.name");
     }
@@ -402,9 +445,8 @@ public class StreamAppender extends AbstractAppender {
    *
    * @param log4jSystemConfig log4jSystemConfig for this appender
    * @param systemName name of the system
-   * @param streamName name of the stream
    */
-  private void setSerde(Log4jSystemConfig log4jSystemConfig, String 
systemName, String streamName) {
+  protected void setSerde(Log4jSystemConfig log4jSystemConfig, String 
systemName) {
     String serdeClass = LoggingEventJsonSerdeFactory.class.getCanonicalName();
     String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, 
streamName);
 
diff --git 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
index 38f613c..466a520 100644
--- 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
+++ 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
@@ -34,10 +34,22 @@ public class StreamAppenderMetrics extends MetricsBase {
   /** The number of log messages dropped e.g. because of buffer overflow. Does 
not include recursive calls. */
   public final Counter logMessagesDropped;
 
+  /** The number of log messages cannot be sent out due to errors e.g. 
serialization errors, system producer send errors. */
+  public final Counter logMessagesErrors;
+
+  /** The size of log messages sent out to SystemProducer. */
+  public final Counter logMessagesBytesSent;
+
+  /** The number of log messages sent out to SystemProducer. */
+  public final Counter logMessagesCountSent;
+
   public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
-    super(prefix, registry);
+    super(prefix + "-", registry);
     bufferFillPct = newGauge("buffer-fill-percent", 0);
     recursiveCalls = newCounter("recursive-calls");
     logMessagesDropped = newCounter("log-messages-dropped");
+    logMessagesErrors = newCounter("log-messages-errors");
+    logMessagesBytesSent = newCounter("log-messages-bytes-sent");
+    logMessagesCountSent = newCounter("log-messages-count-sent");
   }
 }
diff --git 
a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
 
b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
index 0248343..1680a84 100644
--- 
a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
+++ 
b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
@@ -68,13 +68,12 @@ public class TestStreamAppender {
   @Test
   public void testNonDefaultSerde() {
     System.setProperty("samza.container.name", "samza-container-1");
-    String streamName = StreamAppender.getStreamName("log4jTest", "1");
     Map<String, String> map = new HashMap<String, String>();
     map.put("job.name", "log4jTest");
     map.put("job.id", "1");
     map.put("serializers.registry.log4j-string.class", 
LoggingEventStringSerdeFactory.class.getCanonicalName());
     map.put("systems.mock.samza.factory", 
MockSystemFactory.class.getCanonicalName());
-    map.put("systems.mock.streams." + streamName + ".samza.msg.serde", 
"log4j-string");
+    map.put("systems.mock.streams.__samza_log4jTest_1_logs.samza.msg.serde", 
"log4j-string");
     map.put("task.log4j.system", "mock");
     PatternLayout layout = 
PatternLayout.newBuilder().withPattern("%m").build();
     MockSystemProducerAppender systemProducerAppender = 
MockSystemProducerAppender.createAppender("testName", null, layout, false, new 
MapConfig(map), null);

Reply via email to