Repository: samza
Updated Branches:
  refs/heads/master 29ecae891 -> ef1c9625c


SAMZA-1539: KafkaProducer potential hang on close() when task.drop.pr…

…oducer.errors==true

Author: Jacob Maes <[email protected]>

Reviewers: Boris Shkolnik <[email protected]>

Closes #390 from jmakes/samza-1539


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ef1c9625
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ef1c9625
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ef1c9625

Branch: refs/heads/master
Commit: ef1c9625c63b6c169585c5d1928298f36f3037fb
Parents: 29ecae8
Author: Jacob Maes <[email protected]>
Authored: Fri Dec 22 10:36:37 2017 -0800
Committer: Jacob Maes <--global>
Committed: Fri Dec 22 10:36:37 2017 -0800

----------------------------------------------------------------------
 .../system/kafka/KafkaSystemProducer.scala      | 135 ++++++++++---------
 .../system/kafka/TestKafkaSystemProducer.scala  |  39 ++++--
 2 files changed, 103 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ef1c9625/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 5e83666..9eaf895 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -28,7 +28,6 @@ import org.apache.kafka.clients.producer.Producer
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.kafka.common.PartitionInfo
-import org.apache.kafka.common.errors.SerializationException
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.SystemProducerException
@@ -46,32 +45,30 @@ class KafkaSystemProducer(systemName: String,
 
   // Represents a fatal error that caused the producer to close.
   val fatalException: AtomicReference[SystemProducerException] = new 
AtomicReference[SystemProducerException]()
-  @volatile var producer: Producer[Array[Byte], Array[Byte]] = null
-  val producerLock: Object = new Object
+  val producerRef: AtomicReference[Producer[Array[Byte], Array[Byte]]] = new 
AtomicReference[Producer[Array[Byte], Array[Byte]]]()
+  val producerCreationLock: Object = new Object
+  @volatile var stopped = false
 
   def start(): Unit = {
-    producer = getProducer()
+    producerRef.set(getProducer())
   }
 
   def stop() {
     info("Stopping producer for system: " + this.systemName)
 
-    // stop() should not happen often so no need to optimize locking
-    producerLock.synchronized {
-      try {
-        if (producer != null) {
-          producer.close  // Also performs the equivalent of a flush()
-        }
+    stopped = true
+    val currentProducer = producerRef.getAndSet(null)
+    try {
+      if (currentProducer != null) {
+        currentProducer.close // Also performs the equivalent of a flush()
+      }
 
-        val exception = fatalException.get()
-        if (exception != null) {
-          error("Observed an earlier send() error while closing producer", 
exception)
-        }
-      } catch {
-        case e: Exception => error("Error while closing producer for system: " 
+ systemName, e)
-      } finally {
-        producer = null
+      val exception = fatalException.get()
+      if (exception != null) {
+        error("Observed an earlier send() error while closing producer", 
exception)
       }
+    } catch {
+      case e: Exception => error("Error while closing producer for system: " + 
systemName, e)
     }
   }
 
@@ -82,7 +79,7 @@ class KafkaSystemProducer(systemName: String,
     trace("Enqueuing message: %s, %s." format (source, envelope))
 
     val topicName = envelope.getSystemStream.getStream
-    if (topicName == null || topicName == "") {
+    if (topicName == null || topicName.isEmpty) {
       throw new IllegalArgumentException("Invalid system stream: " + 
envelope.getSystemStream)
     }
 
@@ -92,10 +89,7 @@ class KafkaSystemProducer(systemName: String,
       throw new SystemProducerException("Producer was unable to recover from 
previous exception.", globalProducerException)
     }
 
-    val currentProducer = producer
-    if (currentProducer == null) {
-      throw new SystemProducerException("Kafka producer is null.")
-    }
+    val currentProducer = getOrCreateCurrentProducer
 
     // Java-based Kafka producer API requires an "Integer" type partitionKey 
and does not allow custom overriding of Partitioners
     // Any kind of custom partitioning has to be done on the client-side
@@ -115,7 +109,7 @@ class KafkaSystemProducer(systemName: String,
             val producerException = new SystemProducerException("Failed to 
send message for Source: %s on System:%s Topic:%s Partition:%s"
               .format(source, systemName, topicName, partitionKey), exception)
 
-            handleSendException(currentProducer, producerException, true)
+            handleFatalSendException(currentProducer, producerException)
           }
         }
       })
@@ -125,18 +119,25 @@ class KafkaSystemProducer(systemName: String,
         val producerException = new SystemProducerException("Failed to send 
message for Source: %s on System:%s Topic:%s Partition:%s"
           .format(source, systemName, topicName, partitionKey), 
originalException)
 
-        handleSendException(currentProducer, producerException, 
isFatalException(originalException))
+        metrics.sendFailed.inc
+        error("Got a synchronous error from Kafka producer.", 
producerException)
+        // Synchronous exceptions are always recoverable so propagate it up 
and let the user decide
         throw producerException
     }
   }
 
-
   def flush(source: String) {
     updateTimer(metrics.flushNs) {
       metrics.flushes.inc
 
-      val currentProducer = producer
+      val currentProducer = producerRef.get()
       if (currentProducer == null) {
+        if (dropProducerExceptions) {
+          // No producer to flush, but we're ignoring exceptions so just 
return.
+          warn("Skipping flush because the Kafka producer is null.")
+          metrics.flushFailed.inc
+          return
+        }
         throw new SystemProducerException("Kafka producer is null.")
       }
 
@@ -162,7 +163,14 @@ class KafkaSystemProducer(systemName: String,
   }
 
 
-  private def handleSendException(currentProducer: Producer[Array[Byte], 
Array[Byte]], producerException: SystemProducerException, isFatalException: 
Boolean) = {
+  /**
+    * Handles a fatal exception by closing the producer and either recreating 
it or storing the exception
+    * to rethrow later, depending on the value of dropProducerExceptions.
+    *
+    * @param currentProducer   the current producer for which the exception 
occurred. Must not be null.
+    * @param producerException the exception to handle.
+    */
+  private def handleFatalSendException(currentProducer: Producer[Array[Byte], 
Array[Byte]], producerException: SystemProducerException): Unit = {
     metrics.sendFailed.inc
     error(producerException)
     // The SystemProducer API is synchronous, so there's no way for us to 
guarantee that an exception will
@@ -172,49 +180,56 @@ class KafkaSystemProducer(systemName: String,
     if (dropProducerExceptions) {
       warn("Ignoring producer exception. All messages in the failed producer 
request will be dropped!")
 
-      if (isFatalException) {
-        producerLock.synchronized {
-          // Prevent each callback from recreating producer for the same 
failure.
-          if (currentProducer == producer) {
-            info("Creating a new producer for system %s." format systemName)
-            try {
-              currentProducer.close(0, TimeUnit.MILLISECONDS)
-            } catch {
-              case exception: Exception => error("Exception while closing 
producer.", exception)
-            }
-            producer = getProducer()
-          }
-        }
-      }
-    } else {
-      // If there is an exception in the callback, it means that the Kafka 
producer has exhausted the max-retries
-      // Close producer to ensure messages queued in-flight are not sent and 
hence, avoid re-ordering
-      // This works because there is only 1 callback thread and no sends can 
complete until the callback returns.
-      if (isFatalException) {
-        fatalException.compareAndSet(null, producerException)
+      // Prevent each callback from closing and nulling producer for the same 
failure.
+      if (currentProducer == producerRef.get()) {
+        info("Closing producer for system %s." format systemName)
         try {
+          // send()s can get ProducerClosedException if the producer is 
stopped after they get the currentProducer
+          // reference but before producer.send() returns. That's ONLY ok when 
dropProducerExceptions is true.
+          // Also, when producer.close(0) is invoked on the Kafka IO thread, 
when it returns there will be no more
+          // messages sent over the wire. This is key to ensuring no 
out-of-order messages as a result of recreating
+          // the producer.
           currentProducer.close(0, TimeUnit.MILLISECONDS)
         } catch {
           case exception: Exception => error("Exception while closing 
producer.", exception)
         }
+        producerRef.compareAndSet(currentProducer, null)
+      }
+    } else {
+      // If there is an exception in the callback, it means that the Kafka 
producer has exhausted the max-retries
+      // Close producer to ensure messages queued in-flight are not sent and 
hence, avoid re-ordering
+      // This works because there is only 1 IO thread and no IO can be done 
until the callback returns.
+      // Do not create a new producer here! It cannot be done without data 
loss for all concurrency modes.
+      fatalException.compareAndSet(null, producerException)
+      try {
+        currentProducer.close(0, TimeUnit.MILLISECONDS)
+      } catch {
+        case exception: Exception => error("Exception while closing 
producer.", exception)
       }
     }
   }
 
   /**
-    * A fatal exception is one that corrupts the producer or otherwise makes 
it unusable.
-    * We want to handle non-fatal exceptions differently because they can 
often be handled by the user
-    * and that's preferable because it gives users that drop exceptions a way 
to do that with less
-    * data loss (no collateral damage from batches of messages getting dropped)
-    *
-    * @param exception  the exception to check
-    * @return           true if the exception is unrecoverable.
+    * @return the current producer. Never returns null.
     */
-  private def isFatalException(exception: Exception): Boolean = {
-    exception match {
-      case _: SerializationException => false
-      case _: ClassCastException => false
-      case _ => true
+  private def getOrCreateCurrentProducer = {
+    var currentProducer = producerRef.get
+
+    if (currentProducer == null) {
+      if (dropProducerExceptions && !stopped) {
+        // Note: While this lock prevents others from creating a new producer, 
they could still set it to null.
+        producerCreationLock.synchronized {
+          currentProducer = producerRef.get
+          if (currentProducer == null) {
+            currentProducer = getProducer()
+            producerRef.set(currentProducer)
+          }
+        }
+        // Invariant: currentProducer must not be null at this point.
+      } else {
+        throw new SystemProducerException("Kafka producer is null.")
+      }
     }
+    currentProducer
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ef1c9625/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index 9117be5..d874a07 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -41,7 +41,7 @@ class TestKafkaSystemProducer {
     systemProducer.register("test")
     systemProducer.start
     systemProducer.send("test", someMessage)
-    assertEquals(1, 
systemProducer.producer.asInstanceOf[MockProducer[Array[Byte], 
Array[Byte]]].history().size())
+    assertEquals(1, 
systemProducer.producerRef.get().asInstanceOf[MockProducer[Array[Byte], 
Array[Byte]]].history().size())
     systemProducer.stop
   }
 
@@ -207,7 +207,7 @@ class TestKafkaSystemProducer {
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val producer = new KafkaSystemProducer(systemName =  "test",
                                            getProducer = () => {
-                                             mockProducer.open() // A new 
producer is never closed
+                                             mockProducer.open() // A new 
producer would not already be closed, so reset it.
                                              mockProducer
                                            },
                                            metrics = producerMetrics)
@@ -219,6 +219,7 @@ class TestKafkaSystemProducer {
     mockProducer.setErrorNext(true, true, new RecordTooLargeException())
     producer.send("test", msg3) // Callback exception
     assertTrue(mockProducer.isClosed)
+    assertNotNull(producer.producerRef.get())
     assertEquals("Should NOT have created a new producer", 1, 
mockProducer.getOpenCount)
 
     val senderException = intercept[SystemProducerException] {
@@ -269,7 +270,7 @@ class TestKafkaSystemProducer {
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val producer = new KafkaSystemProducer(systemName =  "test",
       getProducer = () => {
-        mockProducer.open() // A new producer is never closed
+        mockProducer.open() // A new producer would not already be closed, so 
reset it.
         mockProducer
       },
       metrics = producerMetrics)
@@ -286,6 +287,7 @@ class TestKafkaSystemProducer {
     mockProducer.setErrorNext(true, true, new RecordTooLargeException())
     producer.send("test1", msg3) // Callback exception
     assertTrue(mockProducer.isClosed)
+    assertNotNull(producer.producerRef.get())
     assertEquals("Should NOT have created a new producer", 1, 
mockProducer.getOpenCount)
 
     // Subsequent sends
@@ -343,7 +345,7 @@ class TestKafkaSystemProducer {
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val producer = new KafkaSystemProducer(systemName = "test",
       getProducer = () => {
-        mockProducer.open() // A new producer is never closed
+        mockProducer.open() // A new producer would not already be closed, so 
reset it.
         mockProducer
       },
       metrics = producerMetrics)
@@ -359,6 +361,7 @@ class TestKafkaSystemProducer {
     }
     assertTrue(sendException.getCause.isInstanceOf[SerializationException])
     assertFalse(mockProducer.isClosed)
+    assertNotNull(producer.producerRef.get())
     assertEquals("Should NOT have created a new producer", 1, 
mockProducer.getOpenCount)
 
     producer.send("test1", msg3) // Should be able to resend msg3
@@ -406,7 +409,7 @@ class TestKafkaSystemProducer {
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val producer = new KafkaSystemProducer(systemName =  "test",
       getProducer = () => {
-        mockProducer.open() // A new producer is never closed
+        mockProducer.open() // A new producer would not already be closed, so 
reset it.
         mockProducer
       },
       metrics = producerMetrics,
@@ -418,13 +421,18 @@ class TestKafkaSystemProducer {
     producer.send("test", msg2)
     mockProducer.setErrorNext(true, true, new RecordTooLargeException())
     producer.send("test", msg3) // Callback exception
-    assertFalse(mockProducer.isClosed)
-    assertEquals("Should have created a new producer", 2, 
mockProducer.getOpenCount)
+    assertTrue(mockProducer.isClosed)
+    assertNull(producer.producerRef.get())
+    assertEquals("Should not have created a new producer", 1, 
mockProducer.getOpenCount)
 
     producer.send("test", msg4) // Should succeed because the producer 
recovered.
+    assertFalse(mockProducer.isClosed)
+    assertNotNull(producer.producerRef.get())
+    assertEquals("Should have created a new producer", 2, 
mockProducer.getOpenCount)
     producer.flush("test") // Should not throw
 
     producer.send("test", msg5) // Should be able to send again after flush
+    assertEquals("Should not have created a new producer", 2, 
mockProducer.getOpenCount)
     producer.flush("test")
 
     assertEquals(4, mockProducer.getMsgsSent) // every message except the one 
with the error should get sent
@@ -456,7 +464,7 @@ class TestKafkaSystemProducer {
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val producer = new KafkaSystemProducer(systemName =  "test",
       getProducer = () => {
-        mockProducer.open() // A new producer is never closed
+        mockProducer.open() // A new producer would not already be closed, so 
reset it.
         mockProducer
       },
       metrics = producerMetrics,
@@ -473,12 +481,17 @@ class TestKafkaSystemProducer {
     // Inject error for next send
     mockProducer.setErrorNext(true, true, new RecordTooLargeException())
     producer.send("test1", msg3) // Callback exception
-    assertFalse(mockProducer.isClosed)
-    assertEquals("Should have created a new producer", 2, 
mockProducer.getOpenCount)
+    assertTrue(mockProducer.isClosed)
+    assertNull(producer.producerRef.get())
+    assertEquals("Should not have created a new producer", 1, 
mockProducer.getOpenCount)
 
     // Subsequent sends
     producer.send("test1", msg4) // Should succeed because the producer 
recovered.
+    assertFalse(mockProducer.isClosed)
+    assertEquals("Should have created a new producer", 2, 
mockProducer.getOpenCount)
+    assertNotNull(producer.producerRef.get())
     producer.send("test2", msg5) // Second source should also not have any 
error.
+    assertEquals("Should not have created a new producer", 2, 
mockProducer.getOpenCount)
 
     // Flushes
     producer.flush("test2") // Should not throw for test2
@@ -503,7 +516,7 @@ class TestKafkaSystemProducer {
     val mockProducer = new MockKafkaProducer(1, "test", 1)
     val producer = new KafkaSystemProducer(systemName = "test",
       getProducer = () => {
-        mockProducer.open() // A new producer is never closed
+        mockProducer.open() // A new producer would not already be closed, so 
reset it.
         mockProducer
       },
       metrics = producerMetrics,
@@ -520,9 +533,13 @@ class TestKafkaSystemProducer {
     }
     assertTrue(sendException.getCause.isInstanceOf[SerializationException])
     assertFalse(mockProducer.isClosed)
+    assertNotNull(producer.producerRef.get()) // Synchronous error; producer 
should not be recreated
     assertEquals("Should NOT have created a new producer", 1, 
mockProducer.getOpenCount)
 
     producer.send("test1", msg3) // Should be able to resend msg3
+    assertFalse(mockProducer.isClosed)
+    assertEquals("Should NOT have created a new producer", 1, 
mockProducer.getOpenCount)
+    assertNotNull(producer.producerRef.get())
     producer.send("test2", msg4) // Second source should not be affected
 
     producer.flush("test1") // Flush should be unaffected

Reply via email to