Repository: samza
Updated Branches:
  refs/heads/master 317b6ff1b -> 61d35f26c


SAMZA-1028: Fix KafkaSystemProducer logging and use an AtomicReference for 
tracking producer exceptions


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/61d35f26
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/61d35f26
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/61d35f26

Branch: refs/heads/master
Commit: 61d35f26c1f924f376b30f60d05c1957364c05e5
Parents: 317b6ff
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Thu Sep 29 14:51:23 2016 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Thu Sep 29 14:52:27 2016 -0700

----------------------------------------------------------------------
 .../system/kafka/KafkaSystemProducer.scala      | 69 +++++++++++++-------
 1 file changed, 47 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/61d35f26/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 5ff6d3c..aac53fc 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -20,8 +20,8 @@
 package org.apache.samza.system.kafka
 
 
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.Future
+import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Future}
 
 import org.apache.kafka.clients.producer.Callback
 import org.apache.kafka.clients.producer.Producer
@@ -56,12 +56,13 @@ class KafkaSystemProducer(systemName: String,
     @volatile
     var latestFuture: Future[RecordMetadata] = null
     /**
-     * exceptionThrown: to store the exception in case of any "ultimate" send 
failure (ie. failure
+     * exceptionInCallback: to store the exception in case of any "ultimate" 
send failure (ie. failure
      * after exhausting max_retries in Kafka producer) in the I/O thread, we 
do not continue to queue up more send
      * requests from the samza thread. It helps the samza thread identify if 
the failure happened in I/O thread or not.
+     *
+     * In cases of multiple exceptions in the callbacks, we keep the first one 
before throwing.
      */
-    @volatile
-    var exceptionThrown: SamzaException = null
+    var exceptionInCallback: AtomicReference[SamzaException] = new 
AtomicReference[SamzaException]()
   }
 
   @volatile var producer: Producer[Array[Byte], Array[Byte]] = null
@@ -80,13 +81,13 @@ class KafkaSystemProducer(systemName: String,
           producer = null
 
           sources.foreach {p =>
-            if (p._2.exceptionThrown == null) {
+            if (p._2.exceptionInCallback.get() == null) {
               flush(p._1)
             }
           }
         }
       } catch {
-        case e: Exception => logger.error(e.getMessage, e)
+        case e: Exception => error(e.getMessage, e)
       }
     }
   }
@@ -97,6 +98,21 @@ class KafkaSystemProducer(systemName: String,
     }
   }
 
+  def closeAndNullifyCurrentProducer(currentProducer: Producer[Array[Byte], 
Array[Byte]]) {
+    try {
+      // TODO: we should use timeout close() to make sure we fail all waiting 
messages in kafka 0.9+
+      currentProducer.close()
+    } catch {
+      case e: Exception => error("producer close failed", e)
+    }
+    producerLock.synchronized {
+      if (currentProducer == producer) {
+        // only nullify the member producer if it is still the same object, no 
point nullifying new producer
+        producer = null
+      }
+    }
+  }
+
   def send(source: String, envelope: OutgoingMessageEnvelope) {
     trace("Enqueuing message: %s, %s." format (source, envelope))
 
@@ -110,10 +126,10 @@ class KafkaSystemProducer(systemName: String,
       throw new IllegalArgumentException("Source %s must be registered first 
before send." format source)
     }
 
-    val exception = sourceData.exceptionThrown
+    val exception = sourceData.exceptionInCallback.getAndSet(null)
     if (exception != null) {
       metrics.sendFailed.inc
-      throw exception
+      throw exception  // in case the caller catches all exceptions and will 
try again
     }
 
     // lazy initialization of the producer
@@ -134,8 +150,7 @@ class KafkaSystemProducer(systemName: String,
     // Java-based Kafka producer API requires an "Integer" type partitionKey 
and does not allow custom overriding of Partitioners
     // Any kind of custom partitioning has to be done on the client-side
     val partitions: java.util.List[PartitionInfo] = 
currentProducer.partitionsFor(topicName)
-    val partitionKey = if (envelope.getPartitionKey != null) 
KafkaUtil.getIntegerPartitionKey(envelope, partitions)
-    else null
+    val partitionKey = if (envelope.getPartitionKey != null) 
KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
     val record = new ProducerRecord(envelope.getSystemStream.getStream,
                                     partitionKey,
                                     envelope.getKey.asInstanceOf[Array[Byte]],
@@ -147,17 +162,23 @@ class KafkaSystemProducer(systemName: String,
           currentProducer.send(record, new Callback {
             def onCompletion(metadata: RecordMetadata, exception: Exception): 
Unit = {
               if (exception == null) {
-                //send was successful. Don't retry
+                //send was successful.
                 metrics.sendSuccess.inc
               }
               else {
-                //If there is an exception in the callback, fail container!
-                //Close producer.
-                currentProducer.close
-                sourceData.exceptionThrown = new SamzaException("Unable to 
send message from %s to system %s." format(source, systemName),
-                                                     exception)
+                error("Closing the producer because of an exception in 
callback: ", exception)
+                //If there is an exception in the callback, close producer.
+                closeAndNullifyCurrentProducer(currentProducer)
+
+                // we keep the exception and will throw the exception in the 
next producer.send()
+                // so the user can handle the exception and decide to fail or 
ignore
+                sourceData.exceptionInCallback.compareAndSet(
+                  null,
+                  new SamzaException("Unable to send message from %s to system 
%s." format(source, systemName),
+                    exception))
+
                 metrics.sendFailed.inc
-                logger.error("Unable to send message on Topic:%s Partition:%s" 
format(topicName, partitionKey),
+                error("Unable to send message on Topic:%s Partition:%s" 
format(topicName, partitionKey),
                              exception)
               }
             }
@@ -167,7 +188,10 @@ class KafkaSystemProducer(systemName: String,
       metrics.sends.inc
     } catch {
       case e: Exception => {
-        currentProducer.close()
+        error("Closing the producer because of an exception in send: ", e)
+
+        closeAndNullifyCurrentProducer(currentProducer)
+
         metrics.sendFailed.inc
         throw new SamzaException(("Failed to send message on Topic:%s 
Partition:%s Exception:\n %s,")
           .format(topicName, partitionKey, e))
@@ -183,7 +207,7 @@ class KafkaSystemProducer(systemName: String,
       //if latestFuture is null, it probably means that there has been no 
calls to "send" messages
       //Hence, nothing to do in flush
       if(sourceData.latestFuture != null) {
-        while(!sourceData.latestFuture.isDone && sourceData.exceptionThrown == 
null) {
+        while(!sourceData.latestFuture.isDone && 
sourceData.exceptionInCallback.get() == null) {
           try {
             sourceData.latestFuture.get()
           } catch {
@@ -191,9 +215,10 @@ class KafkaSystemProducer(systemName: String,
           }
         }
 
-        if (sourceData.exceptionThrown != null) {
+        //if there is an exception thrown from the previous callbacks just 
before flush, we have to fail the container
+        if (sourceData.exceptionInCallback.get() != null) {
           metrics.flushFailed.inc
-          throw sourceData.exceptionThrown
+          throw sourceData.exceptionInCallback.get()
         } else {
           trace("Flushed %s." format (source))
         }

Reply via email to