Repository: samza Updated Branches: refs/heads/master 29ecae891 -> ef1c9625c
SAMZA-1539: KafkaProducer potential hang on close() when task.drop.pr⦠â¦oducer.errors==true Author: Jacob Maes <[email protected]> Reviewers: Boris Shkolnik <[email protected]> Closes #390 from jmakes/samza-1539 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ef1c9625 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ef1c9625 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ef1c9625 Branch: refs/heads/master Commit: ef1c9625c63b6c169585c5d1928298f36f3037fb Parents: 29ecae8 Author: Jacob Maes <[email protected]> Authored: Fri Dec 22 10:36:37 2017 -0800 Committer: Jacob Maes <--global> Committed: Fri Dec 22 10:36:37 2017 -0800 ---------------------------------------------------------------------- .../system/kafka/KafkaSystemProducer.scala | 135 ++++++++++--------- .../system/kafka/TestKafkaSystemProducer.scala | 39 ++++-- 2 files changed, 103 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ef1c9625/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index 5e83666..9eaf895 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -28,7 +28,6 @@ import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.PartitionInfo -import org.apache.kafka.common.errors.SerializationException import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.system.SystemProducer import org.apache.samza.system.SystemProducerException @@ -46,32 +45,30 @@ class KafkaSystemProducer(systemName: String, // Represents a fatal error that caused the producer to close. val fatalException: AtomicReference[SystemProducerException] = new AtomicReference[SystemProducerException]() - @volatile var producer: Producer[Array[Byte], Array[Byte]] = null - val producerLock: Object = new Object + val producerRef: AtomicReference[Producer[Array[Byte], Array[Byte]]] = new AtomicReference[Producer[Array[Byte], Array[Byte]]]() + val producerCreationLock: Object = new Object + @volatile var stopped = false def start(): Unit = { - producer = getProducer() + producerRef.set(getProducer()) } def stop() { info("Stopping producer for system: " + this.systemName) - // stop() should not happen often so no need to optimize locking - producerLock.synchronized { - try { - if (producer != null) { - producer.close // Also performs the equivalent of a flush() - } + stopped = true + val currentProducer = producerRef.getAndSet(null) + try { + if (currentProducer != null) { + currentProducer.close // Also performs the equivalent of a flush() + } - val exception = fatalException.get() - if (exception != null) { - error("Observed an earlier send() error while closing producer", exception) - } - } catch { - case e: Exception => error("Error while closing producer for system: " + systemName, e) - } finally { - producer = null + val exception = fatalException.get() + if (exception != null) { + error("Observed an earlier send() error while closing producer", exception) } + } catch { + case e: Exception => error("Error while closing producer for system: " + systemName, e) } } @@ -82,7 +79,7 @@ class KafkaSystemProducer(systemName: String, trace("Enqueuing message: %s, %s." format (source, envelope)) val topicName = envelope.getSystemStream.getStream - if (topicName == null || topicName == "") { + if (topicName == null || topicName.isEmpty) { throw new IllegalArgumentException("Invalid system stream: " + envelope.getSystemStream) } @@ -92,10 +89,7 @@ class KafkaSystemProducer(systemName: String, throw new SystemProducerException("Producer was unable to recover from previous exception.", globalProducerException) } - val currentProducer = producer - if (currentProducer == null) { - throw new SystemProducerException("Kafka producer is null.") - } + val currentProducer = getOrCreateCurrentProducer // Java-based Kafka producer API requires an "Integer" type partitionKey and does not allow custom overriding of Partitioners // Any kind of custom partitioning has to be done on the client-side @@ -115,7 +109,7 @@ class KafkaSystemProducer(systemName: String, val producerException = new SystemProducerException("Failed to send message for Source: %s on System:%s Topic:%s Partition:%s" .format(source, systemName, topicName, partitionKey), exception) - handleSendException(currentProducer, producerException, true) + handleFatalSendException(currentProducer, producerException) } } }) @@ -125,18 +119,25 @@ class KafkaSystemProducer(systemName: String, val producerException = new SystemProducerException("Failed to send message for Source: %s on System:%s Topic:%s Partition:%s" .format(source, systemName, topicName, partitionKey), originalException) - handleSendException(currentProducer, producerException, isFatalException(originalException)) + metrics.sendFailed.inc + error("Got a synchronous error from Kafka producer.", producerException) + // Synchronous exceptions are always recoverable so propagate it up and let the user decide throw producerException } } - def flush(source: String) { updateTimer(metrics.flushNs) { metrics.flushes.inc - val currentProducer = producer + val currentProducer = producerRef.get() if (currentProducer == null) { + if (dropProducerExceptions) { + // No producer to flush, but we're ignoring exceptions so just return. + warn("Skipping flush because the Kafka producer is null.") + metrics.flushFailed.inc + return + } throw new SystemProducerException("Kafka producer is null.") } @@ -162,7 +163,14 @@ class KafkaSystemProducer(systemName: String, } - private def handleSendException(currentProducer: Producer[Array[Byte], Array[Byte]], producerException: SystemProducerException, isFatalException: Boolean) = { + /** + * Handles a fatal exception by closing the producer and either recreating it or storing the exception + * to rethrow later, depending on the value of dropProducerExceptions. + * + * @param currentProducer the current producer for which the exception occurred. Must not be null. + * @param producerException the exception to handle. + */ + private def handleFatalSendException(currentProducer: Producer[Array[Byte], Array[Byte]], producerException: SystemProducerException): Unit = { metrics.sendFailed.inc error(producerException) // The SystemProducer API is synchronous, so there's no way for us to guarantee that an exception will @@ -172,49 +180,56 @@ class KafkaSystemProducer(systemName: String, if (dropProducerExceptions) { warn("Ignoring producer exception. All messages in the failed producer request will be dropped!") - if (isFatalException) { - producerLock.synchronized { - // Prevent each callback from recreating producer for the same failure. - if (currentProducer == producer) { - info("Creating a new producer for system %s." format systemName) - try { - currentProducer.close(0, TimeUnit.MILLISECONDS) - } catch { - case exception: Exception => error("Exception while closing producer.", exception) - } - producer = getProducer() - } - } - } - } else { - // If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries - // Close producer to ensure messages queued in-flight are not sent and hence, avoid re-ordering - // This works because there is only 1 callback thread and no sends can complete until the callback returns. - if (isFatalException) { - fatalException.compareAndSet(null, producerException) + // Prevent each callback from closing and nulling producer for the same failure. + if (currentProducer == producerRef.get()) { + info("Closing producer for system %s." format systemName) try { + // send()s can get ProducerClosedException if the producer is stopped after they get the currentProducer + // reference but before producer.send() returns. That's ONLY ok when dropProducerExceptions is true. + // Also, when producer.close(0) is invoked on the Kafka IO thread, when it returns there will be no more + // messages sent over the wire. This is key to ensuring no out-of-order messages as a result of recreating + // the producer. currentProducer.close(0, TimeUnit.MILLISECONDS) } catch { case exception: Exception => error("Exception while closing producer.", exception) } + producerRef.compareAndSet(currentProducer, null) + } + } else { + // If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries + // Close producer to ensure messages queued in-flight are not sent and hence, avoid re-ordering + // This works because there is only 1 IO thread and no IO can be done until the callback returns. + // Do not create a new producer here! It cannot be done without data loss for all concurrency modes. + fatalException.compareAndSet(null, producerException) + try { + currentProducer.close(0, TimeUnit.MILLISECONDS) + } catch { + case exception: Exception => error("Exception while closing producer.", exception) } } } /** - * A fatal exception is one that corrupts the producer or otherwise makes it unusable. - * We want to handle non-fatal exceptions differently because they can often be handled by the user - * and that's preferable because it gives users that drop exceptions a way to do that with less - * data loss (no collateral damage from batches of messages getting dropped) - * - * @param exception the exception to check - * @return true if the exception is unrecoverable. + * @return the current producer. Never returns null. */ - private def isFatalException(exception: Exception): Boolean = { - exception match { - case _: SerializationException => false - case _: ClassCastException => false - case _ => true + private def getOrCreateCurrentProducer = { + var currentProducer = producerRef.get + + if (currentProducer == null) { + if (dropProducerExceptions && !stopped) { + // Note: While this lock prevents others from creating a new producer, they could still set it to null. + producerCreationLock.synchronized { + currentProducer = producerRef.get + if (currentProducer == null) { + currentProducer = getProducer() + producerRef.set(currentProducer) + } + } + // Invariant: currentProducer must not be null at this point. + } else { + throw new SystemProducerException("Kafka producer is null.") + } } + currentProducer } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/ef1c9625/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala index 9117be5..d874a07 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala @@ -41,7 +41,7 @@ class TestKafkaSystemProducer { systemProducer.register("test") systemProducer.start systemProducer.send("test", someMessage) - assertEquals(1, systemProducer.producer.asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size()) + assertEquals(1, systemProducer.producerRef.get().asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size()) systemProducer.stop } @@ -207,7 +207,7 @@ class TestKafkaSystemProducer { val mockProducer = new MockKafkaProducer(1, "test", 1) val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => { - mockProducer.open() // A new producer is never closed + mockProducer.open() // A new producer would not already be closed, so reset it. mockProducer }, metrics = producerMetrics) @@ -219,6 +219,7 @@ class TestKafkaSystemProducer { mockProducer.setErrorNext(true, true, new RecordTooLargeException()) producer.send("test", msg3) // Callback exception assertTrue(mockProducer.isClosed) + assertNotNull(producer.producerRef.get()) assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) val senderException = intercept[SystemProducerException] { @@ -269,7 +270,7 @@ class TestKafkaSystemProducer { val mockProducer = new MockKafkaProducer(1, "test", 1) val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => { - mockProducer.open() // A new producer is never closed + mockProducer.open() // A new producer would not already be closed, so reset it. mockProducer }, metrics = producerMetrics) @@ -286,6 +287,7 @@ class TestKafkaSystemProducer { mockProducer.setErrorNext(true, true, new RecordTooLargeException()) producer.send("test1", msg3) // Callback exception assertTrue(mockProducer.isClosed) + assertNotNull(producer.producerRef.get()) assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) // Subsequent sends @@ -343,7 +345,7 @@ class TestKafkaSystemProducer { val mockProducer = new MockKafkaProducer(1, "test", 1) val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => { - mockProducer.open() // A new producer is never closed + mockProducer.open() // A new producer would not already be closed, so reset it. mockProducer }, metrics = producerMetrics) @@ -359,6 +361,7 @@ class TestKafkaSystemProducer { } assertTrue(sendException.getCause.isInstanceOf[SerializationException]) assertFalse(mockProducer.isClosed) + assertNotNull(producer.producerRef.get()) assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) producer.send("test1", msg3) // Should be able to resend msg3 @@ -406,7 +409,7 @@ class TestKafkaSystemProducer { val mockProducer = new MockKafkaProducer(1, "test", 1) val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => { - mockProducer.open() // A new producer is never closed + mockProducer.open() // A new producer would not already be closed, so reset it. mockProducer }, metrics = producerMetrics, @@ -418,13 +421,18 @@ class TestKafkaSystemProducer { producer.send("test", msg2) mockProducer.setErrorNext(true, true, new RecordTooLargeException()) producer.send("test", msg3) // Callback exception - assertFalse(mockProducer.isClosed) - assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount) + assertTrue(mockProducer.isClosed) + assertNull(producer.producerRef.get()) + assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount) producer.send("test", msg4) // Should succeed because the producer recovered. + assertFalse(mockProducer.isClosed) + assertNotNull(producer.producerRef.get()) + assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount) producer.flush("test") // Should not throw producer.send("test", msg5) // Should be able to send again after flush + assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount) producer.flush("test") assertEquals(4, mockProducer.getMsgsSent) // every message except the one with the error should get sent @@ -456,7 +464,7 @@ class TestKafkaSystemProducer { val mockProducer = new MockKafkaProducer(1, "test", 1) val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => { - mockProducer.open() // A new producer is never closed + mockProducer.open() // A new producer would not already be closed, so reset it. mockProducer }, metrics = producerMetrics, @@ -473,12 +481,17 @@ class TestKafkaSystemProducer { // Inject error for next send mockProducer.setErrorNext(true, true, new RecordTooLargeException()) producer.send("test1", msg3) // Callback exception - assertFalse(mockProducer.isClosed) - assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount) + assertTrue(mockProducer.isClosed) + assertNull(producer.producerRef.get()) + assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount) // Subsequent sends producer.send("test1", msg4) // Should succeed because the producer recovered. + assertFalse(mockProducer.isClosed) + assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount) + assertNotNull(producer.producerRef.get()) producer.send("test2", msg5) // Second source should also not have any error. + assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount) // Flushes producer.flush("test2") // Should not throw for test2 @@ -503,7 +516,7 @@ class TestKafkaSystemProducer { val mockProducer = new MockKafkaProducer(1, "test", 1) val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => { - mockProducer.open() // A new producer is never closed + mockProducer.open() // A new producer would not already be closed, so reset it. mockProducer }, metrics = producerMetrics, @@ -520,9 +533,13 @@ class TestKafkaSystemProducer { } assertTrue(sendException.getCause.isInstanceOf[SerializationException]) assertFalse(mockProducer.isClosed) + assertNotNull(producer.producerRef.get()) // Synchronous error; producer should not be recreated assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) producer.send("test1", msg3) // Should be able to resend msg3 + assertFalse(mockProducer.isClosed) + assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount) + assertNotNull(producer.producerRef.get()) producer.send("test2", msg4) // Second source should not be affected producer.flush("test1") // Flush should be unaffected
