SAMZA-723 : hello-samza hangs when we use StreamAppender
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8677a27f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8677a27f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8677a27f Branch: refs/heads/samza-sql Commit: 8677a27fd81895324bd99113929255ec56722390 Parents: 07c6984 Author: Yan Fang <[email protected]> Authored: Tue Oct 27 15:21:41 2015 -0700 Committer: Navina <[email protected]> Committed: Tue Oct 27 15:21:41 2015 -0700 ---------------------------------------------------------------------- .../samza/coordinator/JobCoordinator.scala | 8 +- .../apache/samza/config/Log4jSystemConfig.java | 3 +- .../samza/logging/log4j/StreamAppender.java | 108 +++++++++++-------- .../samza/logging/log4j/TestStreamAppender.java | 39 ++++++- 4 files changed, 112 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index a926ce6..03299cb 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -52,6 +52,11 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory object JobCoordinator extends Logging { /** + * a volatile value to store the current instantiated <code>JobCoordinator</code> + */ + @volatile var currentJobCoordinator: JobCoordinator = null + + /** * @param coordinatorSystemConfig A config object that contains job.name, * job.id, and all system.<job-coordinator-system-name>.* * configuration. The method will use this config to read all configuration @@ -105,7 +110,8 @@ object JobCoordinator extends Logging { val jobModelGenerator = initializeJobModel(config, checkpointManager, changelogManager, localityManager, streamMetadataCache) val server = new HttpServer server.addServlet("/*", new JobServlet(jobModelGenerator)) - new JobCoordinator(jobModelGenerator(), server, checkpointManager) + currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server, checkpointManager) + currentJobCoordinator } /** http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java index d98b8c6..59015a9 100644 --- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java +++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java @@ -69,8 +69,7 @@ public class Log4jSystemConfig extends JavaSystemConfig { /** * Get the class name according to the serde name. * - * @param name - * serde name + * @param name serde name * @return serde factory name, or null if there is no factory defined for the * supplied serde name. */ http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java index 776a36b..0c6329e 100644 --- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java @@ -22,13 +22,13 @@ package org.apache.samza.logging.log4j; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URL; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; import org.apache.samza.config.Log4jSystemConfig; import org.apache.samza.config.SerializerConfig; import org.apache.samza.config.ShellCommandConfig; @@ -63,16 +63,12 @@ public class StreamAppender extends AppenderSkeleton { private boolean isApplicationMaster = false; private Serde<LoggingEvent> serde = null; private Logger log = Logger.getLogger(StreamAppender.class); + protected static volatile boolean systemInitialized = false; /** * used to detect if this thread is called recursively */ - private final ThreadLocal<Boolean> recursiveCall = new ThreadLocal<Boolean>() { - @Override - protected Boolean initialValue() { - return false; - } - }; + private final AtomicBoolean recursiveCall = new AtomicBoolean(false); public String getStreamName() { return this.streamName; @@ -88,44 +84,37 @@ public class StreamAppender extends AppenderSkeleton { if (containerName != null) { isApplicationMaster = containerName.contains(APPLICATION_MASTER_TAG); } else { - throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME + ". This is used as the key for the log appender, so can't proceed."); + throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME + + ". This is used as the key for the log appender, so can't proceed."); } key = containerName; // use the container name as the key for the logs - config = getConfig(); - SystemFactory systemFactory = null; - Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config); - - if (streamName == null) { - streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId()); - } - - String systemName = log4jSystemConfig.getSystemName(); - String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName); - if (systemFactoryName != null) { - systemFactory = Util.<SystemFactory>getObj(systemFactoryName); + // StreamAppender has to wait until the JobCoordinator is up when the log is in the AM + if (isApplicationMaster) { + systemInitialized = false; } else { - throw new SamzaException("Could not figure out the \"" + systemName + "\" system factory for log4j StreamAppender to use"); + setupSystem(); + systemInitialized = true; } - - setSerde(log4jSystemConfig, systemName, streamName); - - systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap()); - systemStream = new SystemStream(systemName, streamName); - systemProducer.register(SOURCE); - systemProducer.start(); - - log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName - + " in " + systemName + ". Logs are partitioned by " + key); } @Override - protected void append(LoggingEvent event) { + public void append(LoggingEvent event) { if (!recursiveCall.get()) { try { recursiveCall.set(true); - OutgoingMessageEnvelope outgoingMessageEnvelope = - new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event))); - systemProducer.send(SOURCE, outgoingMessageEnvelope); + if (!systemInitialized) { + if (JobCoordinator.currentJobCoordinator() != null) { + // JobCoordinator has been instantiated + setupSystem(); + systemInitialized = true; + } else { + log.trace("Waiting for the JobCoordinator to be instantiated..."); + } + } else { + OutgoingMessageEnvelope outgoingMessageEnvelope = + new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event))); + systemProducer.send(SOURCE, outgoingMessageEnvelope); + } } catch (UnsupportedEncodingException e) { throw new SamzaException("can not send the log messages", e); } finally { @@ -150,10 +139,13 @@ public class StreamAppender extends AppenderSkeleton { @Override public void close() { + log.info("Shutting down the StreamAppender..."); if (!this.closed) { this.closed = true; flushSystemProducer(); - systemProducer.stop(); + if (systemProducer != null) { + systemProducer.stop(); + } } } @@ -166,7 +158,9 @@ public class StreamAppender extends AppenderSkeleton { * force the system producer to flush the messages */ public void flushSystemProducer() { - systemProducer.flush(SOURCE); + if (systemProducer != null) { + systemProducer.flush(SOURCE); + } } /** @@ -179,11 +173,12 @@ public class StreamAppender extends AppenderSkeleton { try { if (isApplicationMaster) { - Config coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()), Config.class)); - config = JobCoordinator.apply(coordinatorSystemConfig).jobModel().getConfig(); + config = JobCoordinator.currentJobCoordinator().jobModel().getConfig(); } else { String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL()); - config = SamzaObjectMapper.getObjectMapper().readValue(Util.read(new URL(url), 30000), JobModel.class).getConfig(); + config = SamzaObjectMapper.getObjectMapper() + .readValue(Util.read(new URL(url), 30000), JobModel.class) + .getConfig(); } } catch (IOException e) { throw new SamzaException("can not read the config", e); @@ -192,7 +187,35 @@ public class StreamAppender extends AppenderSkeleton { return config; } - public static String getStreamName(String jobName, String jobId) { + protected void setupSystem() { + config = getConfig(); + SystemFactory systemFactory = null; + Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config); + + if (streamName == null) { + streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId()); + } + + String systemName = log4jSystemConfig.getSystemName(); + String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName); + if (systemFactoryName != null) { + systemFactory = Util.<SystemFactory>getObj(systemFactoryName); + } else { + throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use"); + } + + setSerde(log4jSystemConfig, systemName, streamName); + + systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap()); + systemStream = new SystemStream(systemName, streamName); + systemProducer.register(SOURCE); + systemProducer.start(); + + log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName + + " in " + systemName + ". Logs are partitioned by " + key); + } + + protected static String getStreamName(String jobName, String jobId) { if (jobName == null) { throw new SamzaException("job name is null. Please specify job.name"); } @@ -224,7 +247,8 @@ public class StreamAppender extends AppenderSkeleton { serde = serdeFactory.getSerde(systemName, config); } else { String serdeKey = String.format(SerializerConfig.SERDE(), serdeName); - throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " + serdeKey + " property"); + throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " + + serdeKey + " property"); } } http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java index 1c6f9a4..e2e17a0 100644 --- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java +++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java @@ -70,7 +70,7 @@ public class TestStreamAppender { } @Test - public void testSystemProducerAppender() { + public void testSystemProducerAppenderInContainer() { System.setProperty("samza.container.name", "samza-container-1"); MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(); @@ -87,6 +87,43 @@ public class TestStreamAppender { assertEquals(2, MockSystemProducer.messagesReceived.size()); assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing\"")); assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing2\"")); + + // reset + log.removeAllAppenders(); + MockSystemProducer.messagesReceived.clear(); + } + + @Test + public void testSystemProducerAppenderInAM() { + System.setProperty("samza.container.name", "samza-application-master"); + + MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(); + PatternLayout layout = new PatternLayout(); + layout.setConversionPattern("%m"); + systemProducerAppender.setLayout(layout); + systemProducerAppender.activateOptions(); + log.addAppender(systemProducerAppender); + + log.info("no-received"); + systemProducerAppender.flushSystemProducer(); + // it should not receive anything because the system is not setup + assertEquals(0, MockSystemProducer.messagesReceived.size()); + + systemProducerAppender.setupSystem(); + MockSystemProducerAppender.systemInitialized = true; + + log.info("testing3"); + log.info("testing4"); + systemProducerAppender.flushSystemProducer(); + + // be able to received msgs now + assertEquals(2, MockSystemProducer.messagesReceived.size()); + assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing3\"")); + assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing4\"")); + + // reset + log.removeAllAppenders(); + MockSystemProducer.messagesReceived.clear(); } /**
