Repository: samza Updated Branches: refs/heads/master e87165d40 -> 29ecae891
SAMZA-1537: StreamAppender can deadlock due to locks held by Kafka an⦠â¦d Log4j Author: Jacob Maes <[email protected]> Reviewers: Jagadish <[email protected]>,Yi Pan (Data Infrastructure) <[email protected]> Closes #388 from jmakes/async-stream-appender Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/29ecae89 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/29ecae89 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/29ecae89 Branch: refs/heads/master Commit: 29ecae891a2c9158b59e81d060234504d6844f70 Parents: e87165d Author: Jacob Maes <[email protected]> Authored: Thu Dec 21 11:43:09 2017 -0800 Committer: Jacob Maes <--global> Committed: Thu Dec 21 11:43:09 2017 -0800 ---------------------------------------------------------------------- .../samza/logging/log4j/StreamAppender.java | 109 ++++++++++++-- .../logging/log4j/StreamAppenderMetrics.java | 43 ++++++ .../samza/logging/log4j/MockSystemProducer.java | 12 +- .../samza/logging/log4j/TestStreamAppender.java | 144 +++++++++++++++---- 4 files changed, 270 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/29ecae89/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java index 8436835..0ea8b68 100644 --- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java @@ -20,9 +20,13 @@ package org.apache.samza.logging.log4j; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.net.URL; +import java.util.ArrayList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; @@ -34,6 +38,7 @@ import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.job.model.JobModel; import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory; +import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerdeFactory; @@ -54,6 +59,12 @@ public class StreamAppender extends AppenderSkeleton { private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name"; private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator"; private static final String SOURCE = "log4j-log"; + + protected static final int DEFAULT_QUEUE_SIZE = 100; + private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice + + protected static volatile boolean systemInitialized = false; + private Config config = null; private SystemStream systemStream = null; private SystemProducer systemProducer = null; @@ -62,7 +73,12 @@ public class StreamAppender extends AppenderSkeleton { private boolean isApplicationMaster = false; private Serde<LoggingEvent> serde = null; private Logger log = Logger.getLogger(StreamAppender.class); - protected static volatile boolean systemInitialized = false; + protected StreamAppenderMetrics metrics; + + private final BlockingQueue<byte[]> logQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE); + protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S; + + private Thread transferThread; /** * used to detect if this thread is called recursively @@ -87,6 +103,7 @@ public class StreamAppender extends AppenderSkeleton { ". This is used as the key for the log appender, so can't proceed."); } key = containerName; // use the container name as the key for the logs + // StreamAppender has to wait until the JobCoordinator is up when the log is in the AM if (isApplicationMaster) { systemInitialized = false; @@ -110,9 +127,32 @@ public class StreamAppender extends AppenderSkeleton { log.trace("Waiting for the JobCoordinator to be instantiated..."); } } else { - OutgoingMessageEnvelope outgoingMessageEnvelope = - new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event))); - systemProducer.send(SOURCE, outgoingMessageEnvelope); + // Serialize the event before adding to the queue to leverage the caller thread + // and ensure that the transferThread can keep up. + if (!logQueue.offer(serde.toBytes(subLog(event)), queueTimeoutS, TimeUnit.SECONDS)) { + // Do NOT retry adding to the queue. Dropping the event allows us to alleviate the unlikely + // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer + // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack + // of those two code paths can cause a deadlock. Dropping the event allows us to proceed. + + // Scenario: + // T1: holds L1 and is waiting for L2 + // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1 + + // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control, + // so dropping events in the StreamAppender is our best recourse. + + // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above. + int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event + log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.", + queueTimeoutS, + systemStream.toString(), + messagesDropped)); + + // Emit a metric which can be monitored to ensure it doesn't happen often. + metrics.logMessagesDropped.inc(messagesDropped); + } + metrics.bufferFillPct.set(Math.round(100 * logQueue.size() / DEFAULT_QUEUE_SIZE)); } } catch (Exception e) { System.err.println("[StreamAppender] Error sending log message:"); @@ -120,6 +160,8 @@ public class StreamAppender extends AppenderSkeleton { } finally { recursiveCall.set(false); } + } else if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here. + metrics.recursiveCalls.inc(); } } @@ -142,6 +184,13 @@ public class StreamAppender extends AppenderSkeleton { log.info("Shutting down the StreamAppender..."); if (!this.closed) { this.closed = true; + transferThread.interrupt(); + try { + transferThread.join(); + } catch (InterruptedException e) { + log.error("Interrupted while waiting for sink thread to finish.", e); + } + flushSystemProducer(); if (systemProducer != null) { systemProducer.stop(); @@ -169,7 +218,7 @@ public class StreamAppender extends AppenderSkeleton { * @return Config the config of this container */ protected Config getConfig() { - Config config = null; + Config config; try { if (isApplicationMaster) { @@ -196,6 +245,10 @@ public class StreamAppender extends AppenderSkeleton { streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId()); } + // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters! + MetricsRegistry metricsRegistry = new MetricsRegistryMap(); + metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry); + String systemName = log4jSystemConfig.getSystemName(); String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName); if (systemFactoryName != null) { @@ -206,13 +259,51 @@ public class StreamAppender extends AppenderSkeleton { setSerde(log4jSystemConfig, systemName, streamName); - systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap()); + systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry); systemStream = new SystemStream(systemName, streamName); systemProducer.register(SOURCE); systemProducer.start(); log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName + " in " + systemName + ". Logs are partitioned by " + key); + + startTransferThread(); + } + + private void startTransferThread() { + + try { + // Serialize the key once, since we will use it for every event. + final byte[] keyBytes = key.getBytes("UTF-8"); + + Runnable transferFromQueueToSystem = () -> { + while (!Thread.currentThread().isInterrupted()) { + try { + byte[] serializedLogEvent = logQueue.take(); + + OutgoingMessageEnvelope outgoingMessageEnvelope = + new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent); + systemProducer.send(SOURCE, outgoingMessageEnvelope); + + } catch (InterruptedException e) { + // Preserve the interrupted status for the loop condition. + Thread.currentThread().interrupt(); + } catch (Throwable t) { + log.error("Error sending StreamAppender event to SystemProducer", t); + } + } + }; + + transferThread = new Thread(transferFromQueueToSystem); + transferThread.setDaemon(true); + transferThread.setName("Samza StreamAppender Producer " + transferThread.getName()); + transferThread.start(); + + } catch (UnsupportedEncodingException e) { + throw new SamzaException(String.format( + "Container name: %s could not be encoded to bytes. StreamAppender cannot proceed.", key), + e); + } } protected static String getStreamName(String jobName, String jobId) { @@ -228,7 +319,7 @@ public class StreamAppender extends AppenderSkeleton { /** * set the serde for this appender. It looks for the stream serde first, then system serde. - * If still can not get the serde, throws exceptions. + * If still can not get the serde, throws exceptions. * * @param log4jSystemConfig log4jSystemConfig for this appender * @param systemName name of the system @@ -254,7 +345,7 @@ public class StreamAppender extends AppenderSkeleton { /** * Returns the serde that is being used for the stream appender. - * + * * @return The Serde<LoggingEvent> that the appender is using. */ public Serde<LoggingEvent> getSerde() { http://git-wip-us.apache.org/repos/asf/samza/blob/29ecae89/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppenderMetrics.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppenderMetrics.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppenderMetrics.java new file mode 100644 index 0000000..0273cb5 --- /dev/null +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppenderMetrics.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.logging.log4j; + +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.MetricsBase; +import org.apache.samza.metrics.MetricsRegistry; + + +public class StreamAppenderMetrics extends MetricsBase { + /** The percentage of the log queue capacity that is currently filled with messages from 0 to 100. */ + public final Gauge<Integer> bufferFillPct; + + /** The number of recursive calls to the StreamAppender. These events will not be logged. */ + public final Counter recursiveCalls; + + /** The number of log messages dropped e.g. because of buffer overflow. Does not include recursive calls. */ + public final Counter logMessagesDropped; + + public StreamAppenderMetrics(String prefix, MetricsRegistry registry) { + super(prefix, registry); + bufferFillPct = newGauge("buffer-fill-percent", 0); + recursiveCalls = newCounter("recursive-calls"); + logMessagesDropped = newCounter("log-messages-dropped"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/29ecae89/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java index 8d99094..fc8de0b 100644 --- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java +++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java @@ -21,13 +21,15 @@ package org.apache.samza.logging.log4j; import java.util.ArrayList; +import java.util.List; import org.apache.log4j.Logger; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemProducer; public class MockSystemProducer implements SystemProducer { - static public ArrayList<Object> messagesReceived = new ArrayList<Object>(); - static private Logger log = Logger.getLogger(MockSystemProducer.class); + public static ArrayList<Object> messagesReceived = new ArrayList<>(); + private static Logger log = Logger.getLogger(MockSystemProducer.class); + public static List<MockSystemProducerListener> listeners = new ArrayList<>(); @Override public void start() { @@ -45,9 +47,15 @@ public class MockSystemProducer implements SystemProducer { @Override public void send(String source, OutgoingMessageEnvelope envelope) { messagesReceived.add(envelope.getMessage()); + + listeners.forEach((listener) -> listener.onSend(source, envelope)); } @Override public void flush(String source) { } + + public interface MockSystemProducerListener { + void onSend(String source, OutgoingMessageEnvelope envelope); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/29ecae89/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java index 59669b7..d93c5d1 100644 --- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java +++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java @@ -21,9 +21,14 @@ package org.apache.samza.logging.log4j; import static org.junit.Assert.*; +import com.google.common.collect.Lists; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; import org.apache.samza.config.Config; @@ -31,12 +36,20 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde; import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde; import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory; +import org.junit.After; import org.junit.Test; public class TestStreamAppender { static Logger log = Logger.getLogger(TestStreamAppender.class); + @After + public void tearDown() { + log.removeAllAppenders(); + MockSystemProducer.listeners.clear(); + MockSystemProducer.messagesReceived.clear(); + } + @Test public void testDefaultSerde() { System.setProperty("samza.container.name", "samza-container-1"); @@ -70,7 +83,7 @@ public class TestStreamAppender { } @Test - public void testSystemProducerAppenderInContainer() { + public void testSystemProducerAppenderInContainer() throws InterruptedException { System.setProperty("samza.container.name", "samza-container-1"); MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(); @@ -79,22 +92,13 @@ public class TestStreamAppender { systemProducerAppender.setLayout(layout); systemProducerAppender.activateOptions(); log.addAppender(systemProducerAppender); - log.info("testing"); - log.info("testing2"); - - systemProducerAppender.flushSystemProducer(); - - assertEquals(2, MockSystemProducer.messagesReceived.size()); - assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing\"")); - assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing2\"")); - // reset - log.removeAllAppenders(); - MockSystemProducer.messagesReceived.clear(); + List<String> messages = Lists.newArrayList("testing1", "testing2"); + logAndVerifyMessages(messages); } @Test - public void testSystemProducerAppenderInAM() { + public void testSystemProducerAppenderInAM() throws InterruptedException { System.setProperty("samza.container.name", "samza-job-coordinator"); MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(); @@ -104,26 +108,112 @@ public class TestStreamAppender { systemProducerAppender.activateOptions(); log.addAppender(systemProducerAppender); - log.info("no-received"); - systemProducerAppender.flushSystemProducer(); - // it should not receive anything because the system is not setup - assertEquals(0, MockSystemProducer.messagesReceived.size()); + log.info("no-received"); // System isn't initialized yet, so this message should be dropped systemProducerAppender.setupSystem(); MockSystemProducerAppender.systemInitialized = true; - log.info("testing3"); - log.info("testing4"); - systemProducerAppender.flushSystemProducer(); + List<String> messages = Lists.newArrayList("testing3", "testing4"); + logAndVerifyMessages(messages); + } - // be able to received msgs now - assertEquals(2, MockSystemProducer.messagesReceived.size()); - assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing3\"")); - assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing4\"")); + @Test + public void testExceptionsDoNotKillTransferThread() throws InterruptedException { + System.setProperty("samza.container.name", "samza-container-1"); - // reset - log.removeAllAppenders(); - MockSystemProducer.messagesReceived.clear(); + MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(); + PatternLayout layout = new PatternLayout(); + layout.setConversionPattern("%m"); + systemProducerAppender.setLayout(layout); + systemProducerAppender.activateOptions(); + log.addAppender(systemProducerAppender); + + List<String> messages = Lists.newArrayList("testing5", "testing6", "testing7"); + + // Set up latch + final CountDownLatch allMessagesSent = new CountDownLatch(messages.size()); + MockSystemProducer.listeners.add((source, envelope) -> { + allMessagesSent.countDown(); + if (allMessagesSent.getCount() == messages.size() - 1) { + throw new RuntimeException(); // Throw on the first message + } + }); + + // Log the messages + messages.forEach((message) -> log.info(message)); + + // Wait for messages + assertTrue("Thread did not send all messages. Count: " + allMessagesSent.getCount(), + allMessagesSent.await(60, TimeUnit.SECONDS)); + } + + @Test + public void testQueueTimeout() throws InterruptedException { + System.setProperty("samza.container.name", "samza-container-1"); + + MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(); + systemProducerAppender.queueTimeoutS = 1; + PatternLayout layout = new PatternLayout(); + layout.setConversionPattern("%m"); + systemProducerAppender.setLayout(layout); + systemProducerAppender.activateOptions(); + log.addAppender(systemProducerAppender); + + int extraMessageCount = 5; + int expectedMessagesSent = extraMessageCount - 1; // -1 because when the queue is drained there is one additional message that couldn't be added + List<String> messages = new ArrayList<>(StreamAppender.DEFAULT_QUEUE_SIZE + extraMessageCount); + for (int i = 0; i < StreamAppender.DEFAULT_QUEUE_SIZE + extraMessageCount; i++) { + messages.add(String.valueOf(i)); + } + + // Set up latch + final CountDownLatch allMessagesSent = new CountDownLatch(expectedMessagesSent); // We expect to drop all but the extra messages + final CountDownLatch waitForTimeout = new CountDownLatch(1); + MockSystemProducer.listeners.add((source, envelope) -> { + allMessagesSent.countDown(); + try { + waitForTimeout.await(); + } catch (InterruptedException e) { + fail("Test could not run properly because of a thread interrupt."); + } + }); + + // Log the messages. This is where the timeout will happen! + messages.forEach((message) -> log.info(message)); + + assertEquals(messages.size() - expectedMessagesSent, systemProducerAppender.metrics.logMessagesDropped.getCount()); + + // Allow all the rest of the messages to send. + waitForTimeout.countDown(); + + // Wait for messages + assertTrue("Thread did not send all messages. Count: " + allMessagesSent.getCount(), + allMessagesSent.await(60, TimeUnit.SECONDS)); + assertEquals(expectedMessagesSent, MockSystemProducer.messagesReceived.size()); + } + + private void logAndVerifyMessages(List<String> messages) throws InterruptedException { + // Set up latch + final CountDownLatch allMessagesSent = new CountDownLatch(messages.size()); + MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown()); + + // Log the messages + messages.forEach((message) -> log.info(message)); + + // Wait for messages + assertTrue("Timeout while waiting for StreamAppender to send all messages. Count: " + allMessagesSent.getCount(), + allMessagesSent.await(60, TimeUnit.SECONDS)); + + // Verify + assertEquals(messages.size(), MockSystemProducer.messagesReceived.size()); + for (int i = 0; i < messages.size(); i++) { + assertTrue("Message mismatch at index " + i, + new String((byte[]) MockSystemProducer.messagesReceived.get(i)).contains(asJsonMessageSegment(messages.get(i)))); + } + } + + private String asJsonMessageSegment(String message) { + return String.format("\"message\":\"%s\"", message); } /**
