Repository: samza
Updated Branches:
  refs/heads/master 924a78dab -> 30c6a89b3


SAMZA-1608 : Add hidden config to enable explicit stream creation in 
StreamAppender due to bug.

Due to a intermittent bug that causes the explicit stream creation in 
`StreamAppender` to hang, a hidden config is added to enable/disable explicit 
stream creation. By default this is disabled, which reverts to the previous 
behavior.

When the intermittent hang bug is fixed, the config will either be removed or 
made public.

Author: Daniel Nishimura <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>

Closes #442 from dnishimura/samza-1608-disable-streamappender-create-stream


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/30c6a89b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/30c6a89b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/30c6a89b

Branch: refs/heads/master
Commit: 30c6a89b3605ce97c9226656953b62176393d919
Parents: 924a78d
Author: Daniel Nishimura <[email protected]>
Authored: Fri Mar 16 13:10:56 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Fri Mar 16 13:10:56 2018 -0700

----------------------------------------------------------------------
 .../documentation/versioned/jobs/logging.md     |  4 +--
 .../samza/logging/log4j/StreamAppender.java     | 21 +++++++++------
 .../samza/logging/log4j/TestStreamAppender.java | 28 +++++++++++++++++---
 3 files changed, 39 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/30c6a89b/docs/learn/documentation/versioned/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/logging.md 
b/docs/learn/documentation/versioned/jobs/logging.md
index ffb66dd..8cae27b 100644
--- a/docs/learn/documentation/versioned/jobs/logging.md
+++ b/docs/learn/documentation/versioned/jobs/logging.md
@@ -116,14 +116,12 @@ And then updating your log4j.xml to include the appender:
 
 #### Stream Log4j Appender
 
-Samza provides a StreamAppender to publish the logs into a specific system. 
You can specify the system name using "task.log4j.system" and change name of 
log stream with param 'StreamName'. You can also specify the number of 
partitions for the log stream with param 'PartitionCount'; otherwise, the 
number of partitions will equal the number of containers configured for the 
job. The partition count is set upon the creation of the logging stream and 
changing the partition count requires manual intervention with the system 
stream. The [MDC](http://logback.qos.ch/manual/mdc.html) contains the keys 
"containerName", "jobName" and "jobId", which help identify the source of the 
log. In order to use this appender, add:
+Samza provides a StreamAppender to publish the logs into a specific system. 
You can specify the system name using "task.log4j.system" and change name of 
log stream with param 'StreamName'. The 
[MDC](http://logback.qos.ch/manual/mdc.html) contains the keys "containerName", 
"jobName" and "jobId", which help identify the source of the log. In order to 
use this appender, add:
 
 {% highlight xml %}
 <appender name="StreamAppender" 
class="org.apache.samza.logging.log4j.StreamAppender">
    <!-- optional -->
    <param name="StreamName" value="EpicStreamName"/>
-   <!-- optional -->
-   <param name="PartitionCount" value="8"/>
    <layout class="org.apache.log4j.PatternLayout">
      <param name="ConversionPattern" value="%X{containerName} %X{jobName} 
%X{jobId} %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
    </layout>

http://git-wip-us.apache.org/repos/asf/samza/blob/30c6a89b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 9ea169d..ec63358 100644
--- 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -63,6 +63,9 @@ public class StreamAppender extends AppenderSkeleton {
   private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
   private static final String SOURCE = "log4j-log";
 
+  // Hidden config for now. Will move to appropriate Config class when ready 
to.
+  private static final String CREATE_STREAM_ENABLED = 
"task.log4j.create.stream.enabled";
+
   protected static final int DEFAULT_QUEUE_SIZE = 100;
   private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
 
@@ -298,15 +301,17 @@ public class StreamAppender extends AppenderSkeleton {
 
     setSerde(log4jSystemConfig, systemName, streamName);
 
-    // Explicitly create stream appender stream with the partition count the 
same as the number of containers.
-    System.out.println("[StreamAppender] creating stream " + streamName + " 
with partition count " + getPartitionCount());
-    StreamSpec streamSpec = 
StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, 
getPartitionCount());
+    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+      // Explicitly create stream appender stream with the partition count the 
same as the number of containers.
+      System.out.println("[StreamAppender] creating stream " + streamName + " 
with partition count " + getPartitionCount());
+      StreamSpec streamSpec = 
StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, 
getPartitionCount());
 
-    // SystemAdmin only needed for stream creation here.
-    SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
-    systemAdmin.start();
-    systemAdmin.createStream(streamSpec);
-    systemAdmin.stop();
+      // SystemAdmin only needed for stream creation here.
+      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+      systemAdmin.start();
+      systemAdmin.createStream(streamSpec);
+      systemAdmin.stop();
+    }
 
     systemProducer = systemFactory.getProducer(systemName, config, 
metricsRegistry);
     systemStream = new SystemStream(systemName, streamName);

http://git-wip-us.apache.org/repos/asf/samza/blob/30c6a89b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 1257835..3d3c39b 100644
--- 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -21,6 +21,7 @@ package org.apache.samza.logging.log4j;
 
 import static org.junit.Assert.*;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -49,6 +50,7 @@ public class TestStreamAppender {
     log.removeAllAppenders();
     MockSystemProducer.listeners.clear();
     MockSystemProducer.messagesReceived.clear();
+    MockSystemAdmin.createdStreamName = "";
   }
 
   @Test
@@ -119,17 +121,37 @@ public class TestStreamAppender {
   }
 
   @Test
-  public void testStreamCreationUponSetup() {
+  public void testNoStreamCreationUponSetupByDefault() {
     System.setProperty("samza.container.name", "samza-container-1");
 
     MockSystemProducerAppender systemProducerAppender = new 
MockSystemProducerAppender();
     PatternLayout layout = new PatternLayout();
     layout.setConversionPattern("%m");
     systemProducerAppender.setLayout(layout);
-    systemProducerAppender.activateOptions();
+    systemProducerAppender.activateOptions(); // setupSystem() called inside 
here.
+    log.addAppender(systemProducerAppender);
+
+    Assert.assertEquals("", MockSystemAdmin.createdStreamName);
+  }
+
+  @Test
+  public void testStreamCreationUpSetupWhenEnabled() {
+    System.setProperty("samza.container.name", "samza-container-1");
+
+    MapConfig mapConfig = new MapConfig(ImmutableMap.of(
+        "task.log4j.create.stream.enabled", "true", // Enable explicit stream 
creation
+        "job.name", "log4jTest",
+        "job.id", "1",
+        "systems.mock.samza.factory", 
MockSystemFactory.class.getCanonicalName(),
+        "task.log4j.system", "mock"));
+
+    MockSystemProducerAppender systemProducerAppender = new 
MockSystemProducerAppender(mapConfig);
+    PatternLayout layout = new PatternLayout();
+    layout.setConversionPattern("%m");
+    systemProducerAppender.setLayout(layout);
+    systemProducerAppender.activateOptions(); // setupSystem() called inside 
here.
     log.addAppender(systemProducerAppender);
 
-    systemProducerAppender.setupSystem();
     Assert.assertEquals("__samza_log4jTest_1_logs", 
MockSystemAdmin.createdStreamName);
   }
 

Reply via email to