mynameborat commented on a change in pull request #1411:
URL: https://github.com/apache/samza/pull/1411#discussion_r468862157
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -397,15 +425,7 @@ private void startTransferThread() {
Runnable transferFromQueueToSystem = () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
- byte[] serializedLogEvent = logQueue.take();
-
- metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
- metrics.logMessagesCountSent.inc();
-
- OutgoingMessageEnvelope outgoingMessageEnvelope =
- new OutgoingMessageEnvelope(systemStream, keyBytes,
serializedLogEvent);
- systemProducer.send(SOURCE, outgoingMessageEnvelope);
-
+ sendEventToSystemProducer(logQueue.take());
Review comment:
Now that you have cached keyBytes at the instance level, can we get rid
of the local keyBytes byte array?
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -239,6 +227,46 @@ public void append(LogEvent event) {
}
}
+ /**
+ * If async-Logger is enabled, the log-event is sent directly to the
systemProducer. Else, the event is serialized
+ * and added to a bounded blocking queue, before returning to the
"synchronous" caller.
+ * @param event the log event to append
+ * @throws InterruptedException
+ */
+ private void handleEvent(LogEvent event) throws InterruptedException {
+ if (usingAsyncLogger) {
+ sendEventToSystemProducer(encodeLogEventToBytes(event));
+ return;
+ }
+
+ // Serialize the event before adding to the queue to leverage the caller
thread
+ // and ensure that the transferThread can keep up.
+ if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS,
TimeUnit.SECONDS)) {
+ // Do NOT retry adding system to the queue. Dropping the event allows us
to alleviate the unlikely
+ // possibility of a deadlock, which can arise due to a circular
dependency between the SystemProducer
+ // which is used for StreamAppender and the log, which uses
StreamAppender. Any locks held in the callstack
+ // of those two code paths can cause a deadlock. Dropping the event
allows us to proceed.
+
+ // Scenario:
+ // T1: holds L1 and is waiting for L2
+ // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3
(SystemProducer) which is waiting for L1
+
+ // This has happened due to locks in Kafka and log4j (see SAMZA-1537),
which are both out of our control,
+ // so dropping events in the StreamAppender is our best recourse.
+
+ // Drain the queue instead of dropping one message just to reduce the
frequency of warn logs above.
+ int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1
because of the current log event
+ log.warn(String.format("Exceeded timeout %ss while trying to log to %s.
Dropping %d log messages.",
+ queueTimeoutS,
+ systemStream.toString(),
+ messagesDropped));
Review comment:
Capturing our offline conversation - "We will need this to replaced with
System.err as the recursion will drop this message in the event of not able to
acquire the lock within the timeout"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]