Repository: kafka
Updated Branches:
  refs/heads/trunk 2e3722a23 -> 6fb33afff


KAFKA-3875; Transient test failure: 
kafka.api.SslProducerSendTest.testSendNonCompressedMessageWithCreateTime

1. The IllegalStateException is actually thrown from 
testCloseWithZeroTimeoutFromSenderThread() due to a bug. We call 
producer.close() in the callback. Once the first callback is called, producing 
records in the callback will hit the IllegalStateException. This only pollutes 
the output, but doesn't fail the test. I fixed this by only calling 
producer.send() in the first callback.
2. It's not clear which test throws TimeoutException and it's not reproducible 
locally. One thing is that the error message in TimeoutException is mis-leading 
since the timeout is not necessarily due to metadata. Improved this by making 
the error message in TimeoutException clearer.
3. It's not clear what actually failed 
testSendNonCompressedMessageWithCreateTime(). One thing I found is that since 
we set the linger time to MAX_LONG and are sending small messages, those 
produced messages won't be drained until we call producer.close(10000L, 
TimeUnit.MILLISECONDS). Normally, 10 secs should be enough for the records to 
be sent. My only hypothesis is that since SSL is more expensive, occasionally, 
10 secs is still not enough. So, I bumped up the timeout from 10 secs to 20 
secs.

Author: Jun Rao <[email protected]>

Reviewers: Guozhang Wang <[email protected]>, Ismael Juma <[email protected]>

Closes #1703 from junrao/kafka-3875


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6fb33aff
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6fb33aff
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6fb33aff

Branch: refs/heads/trunk
Commit: 6fb33afff976e467bfa8e0b29eb82770a2a3aaec
Parents: 2e3722a
Author: Jun Rao <[email protected]>
Authored: Thu Aug 4 12:30:24 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Thu Aug 4 12:30:24 2016 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/internals/RecordBatch.java | 14 ++++++++++----
 .../integration/kafka/api/BaseProducerSendTest.scala  | 11 +++++++----
 2 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6fb33aff/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index e6cd68f..6706bfd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -142,17 +142,23 @@ public final class RecordBatch {
      */
     public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long 
now, long lingerMs, boolean isFull) {
         boolean expire = false;
+        String errorMessage = null;
 
-        if (!this.inRetry() && isFull && requestTimeoutMs < (now - 
this.lastAppendTime))
+        if (!this.inRetry() && isFull && requestTimeoutMs < (now - 
this.lastAppendTime)) {
             expire = true;
-        else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs 
+ lingerMs)))
+            errorMessage = (now - this.lastAppendTime) + " ms has passed since 
last append";
+        } else if (!this.inRetry() && requestTimeoutMs < (now - 
(this.createdMs + lingerMs))) {
             expire = true;
-        else if (this.inRetry() && requestTimeoutMs < (now - 
(this.lastAttemptMs + retryBackoffMs)))
+            errorMessage = (now - (this.createdMs + lingerMs)) + " ms has 
passed since batch creation plus linger time";
+        } else if (this.inRetry() && requestTimeoutMs < (now - 
(this.lastAttemptMs + retryBackoffMs))) {
             expire = true;
+            errorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " 
ms has passed since last attempt plus backoff time";
+        }
 
         if (expire) {
             this.records.close();
-            this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch 
containing " + recordCount + " record(s) expired due to timeout while 
requesting metadata from brokers for " + topicPartition));
+            this.done(-1L, Record.NO_TIMESTAMP,
+                      new TimeoutException("Expiring " + recordCount + " 
record(s) for " + topicPartition + " due to " + errorMessage));
         }
 
         return expire;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6fb33aff/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 8eaf827..b5a1284 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -224,7 +224,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
         val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 
partition, baseTimestamp + i, "key".getBytes, "value".getBytes)
         producer.send(record, callback)
       }
-      producer.close(10000L, TimeUnit.MILLISECONDS)
+      producer.close(20000L, TimeUnit.MILLISECONDS)
       assertEquals(s"Should have offset $numRecords but only successfully sent 
${callback.offset}", numRecords, callback.offset)
     } finally {
       producer.close()
@@ -408,11 +408,12 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
     val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, 
"value".getBytes)
 
     // Test closing from sender thread.
-    class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) 
extends Callback {
+    class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]], 
sendRecords: Boolean) extends Callback {
       override def onCompletion(metadata: RecordMetadata, exception: 
Exception) {
         // Trigger another batch in accumulator before close the producer. 
These messages should
         // not be sent.
-        (0 until numRecords) map (i => producer.send(record))
+        if (sendRecords)
+          (0 until numRecords) foreach (i => producer.send(record))
         // The close call will be called by all the message callbacks. This 
tests idempotence of the close call.
         producer.close(0, TimeUnit.MILLISECONDS)
         // Test close with non zero timeout. Should not block at all.
@@ -423,7 +424,9 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
       val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
       try {
         // send message to partition 0
-        val responses = (0 until numRecords) map (i => producer.send(record, 
new CloseCallback(producer)))
+        // Only send the records in the first callback since we close the 
producer in the callback and no records
+        // can be sent afterwards.
+        val responses = (0 until numRecords) map (i => producer.send(record, 
new CloseCallback(producer, i == 0)))
         assertTrue("No request is complete.", responses.forall(!_.isDone()))
         // flush the messages.
         producer.flush()

Reply via email to