Repository: samza Updated Branches: refs/heads/master 924a78dab -> 30c6a89b3
SAMZA-1608 : Add hidden config to enable explicit stream creation in StreamAppender due to bug. Due to a intermittent bug that causes the explicit stream creation in `StreamAppender` to hang, a hidden config is added to enable/disable explicit stream creation. By default this is disabled, which reverts to the previous behavior. When the intermittent hang bug is fixed, the config will either be removed or made public. Author: Daniel Nishimura <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #442 from dnishimura/samza-1608-disable-streamappender-create-stream Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/30c6a89b Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/30c6a89b Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/30c6a89b Branch: refs/heads/master Commit: 30c6a89b3605ce97c9226656953b62176393d919 Parents: 924a78d Author: Daniel Nishimura <[email protected]> Authored: Fri Mar 16 13:10:56 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Fri Mar 16 13:10:56 2018 -0700 ---------------------------------------------------------------------- .../documentation/versioned/jobs/logging.md | 4 +-- .../samza/logging/log4j/StreamAppender.java | 21 +++++++++------ .../samza/logging/log4j/TestStreamAppender.java | 28 +++++++++++++++++--- 3 files changed, 39 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/30c6a89b/docs/learn/documentation/versioned/jobs/logging.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/logging.md b/docs/learn/documentation/versioned/jobs/logging.md index ffb66dd..8cae27b 100644 --- a/docs/learn/documentation/versioned/jobs/logging.md +++ b/docs/learn/documentation/versioned/jobs/logging.md @@ -116,14 +116,12 @@ And then updating your log4j.xml to include the appender: #### Stream Log4j Appender -Samza provides a StreamAppender to publish the logs into a specific system. You can specify the system name using "task.log4j.system" and change name of log stream with param 'StreamName'. You can also specify the number of partitions for the log stream with param 'PartitionCount'; otherwise, the number of partitions will equal the number of containers configured for the job. The partition count is set upon the creation of the logging stream and changing the partition count requires manual intervention with the system stream. The [MDC](http://logback.qos.ch/manual/mdc.html) contains the keys "containerName", "jobName" and "jobId", which help identify the source of the log. In order to use this appender, add: +Samza provides a StreamAppender to publish the logs into a specific system. You can specify the system name using "task.log4j.system" and change name of log stream with param 'StreamName'. The [MDC](http://logback.qos.ch/manual/mdc.html) contains the keys "containerName", "jobName" and "jobId", which help identify the source of the log. In order to use this appender, add: {% highlight xml %} <appender name="StreamAppender" class="org.apache.samza.logging.log4j.StreamAppender"> <!-- optional --> <param name="StreamName" value="EpicStreamName"/> - <!-- optional --> - <param name="PartitionCount" value="8"/> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%X{containerName} %X{jobName} %X{jobId} %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" /> </layout> http://git-wip-us.apache.org/repos/asf/samza/blob/30c6a89b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java index 9ea169d..ec63358 100644 --- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java @@ -63,6 +63,9 @@ public class StreamAppender extends AppenderSkeleton { private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator"; private static final String SOURCE = "log4j-log"; + // Hidden config for now. Will move to appropriate Config class when ready to. + private static final String CREATE_STREAM_ENABLED = "task.log4j.create.stream.enabled"; + protected static final int DEFAULT_QUEUE_SIZE = 100; private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice @@ -298,15 +301,17 @@ public class StreamAppender extends AppenderSkeleton { setSerde(log4jSystemConfig, systemName, streamName); - // Explicitly create stream appender stream with the partition count the same as the number of containers. - System.out.println("[StreamAppender] creating stream " + streamName + " with partition count " + getPartitionCount()); - StreamSpec streamSpec = StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount()); + if (config.getBoolean(CREATE_STREAM_ENABLED, false)) { + // Explicitly create stream appender stream with the partition count the same as the number of containers. + System.out.println("[StreamAppender] creating stream " + streamName + " with partition count " + getPartitionCount()); + StreamSpec streamSpec = StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount()); - // SystemAdmin only needed for stream creation here. - SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config); - systemAdmin.start(); - systemAdmin.createStream(streamSpec); - systemAdmin.stop(); + // SystemAdmin only needed for stream creation here. + SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config); + systemAdmin.start(); + systemAdmin.createStream(streamSpec); + systemAdmin.stop(); + } systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry); systemStream = new SystemStream(systemName, streamName); http://git-wip-us.apache.org/repos/asf/samza/blob/30c6a89b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java index 1257835..3d3c39b 100644 --- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java +++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java @@ -21,6 +21,7 @@ package org.apache.samza.logging.log4j; import static org.junit.Assert.*; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.HashMap; @@ -49,6 +50,7 @@ public class TestStreamAppender { log.removeAllAppenders(); MockSystemProducer.listeners.clear(); MockSystemProducer.messagesReceived.clear(); + MockSystemAdmin.createdStreamName = ""; } @Test @@ -119,17 +121,37 @@ public class TestStreamAppender { } @Test - public void testStreamCreationUponSetup() { + public void testNoStreamCreationUponSetupByDefault() { System.setProperty("samza.container.name", "samza-container-1"); MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(); PatternLayout layout = new PatternLayout(); layout.setConversionPattern("%m"); systemProducerAppender.setLayout(layout); - systemProducerAppender.activateOptions(); + systemProducerAppender.activateOptions(); // setupSystem() called inside here. + log.addAppender(systemProducerAppender); + + Assert.assertEquals("", MockSystemAdmin.createdStreamName); + } + + @Test + public void testStreamCreationUpSetupWhenEnabled() { + System.setProperty("samza.container.name", "samza-container-1"); + + MapConfig mapConfig = new MapConfig(ImmutableMap.of( + "task.log4j.create.stream.enabled", "true", // Enable explicit stream creation + "job.name", "log4jTest", + "job.id", "1", + "systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName(), + "task.log4j.system", "mock")); + + MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(mapConfig); + PatternLayout layout = new PatternLayout(); + layout.setConversionPattern("%m"); + systemProducerAppender.setLayout(layout); + systemProducerAppender.activateOptions(); // setupSystem() called inside here. log.addAppender(systemProducerAppender); - systemProducerAppender.setupSystem(); Assert.assertEquals("__samza_log4jTest_1_logs", MockSystemAdmin.createdStreamName); }
