Repository: kafka
Updated Branches:
  refs/heads/0.10.2 8812d151a -> 29bc8905a


KAFKA-4699; Invoke producer callbacks before completing the future

This behaviour was changed in 8b3c6c0, but it caused interceptor
test failures (which rely on callbacks) and since we’re so close to
code freeze, it’s better to be conservative.

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2440 from ijuma/kafka-4699-callbacks-invoked-before-future-is-completed

(cherry picked from commit 254e3b77d656a610f19efd1124802e073dfda4b8)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29bc8905
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29bc8905
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29bc8905

Branch: refs/heads/0.10.2
Commit: 29bc8905acea32c8a34b889edb17107132b80037
Parents: 8812d15
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Thu Jan 26 09:50:25 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Jan 26 10:02:52 2017 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/MockProducer.java    |  3 ++-
 .../internals/ProduceRequestResult.java         | 14 +++++++++---
 .../clients/producer/internals/RecordBatch.java |  5 ++--
 .../kafka/clients/producer/RecordSendTest.java  |  6 +++--
 .../runtime/SourceTaskOffsetCommitterTest.java  |  2 +-
 .../kafka/api/PlaintextConsumerTest.scala       | 24 ++++++++++----------
 6 files changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 008ca70..165437d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -245,13 +245,14 @@ public class MockProducer<K, V> implements Producer<K, V> 
{
         }
 
         public void complete(RuntimeException e) {
-            result.done(e == null ? offset : -1L, Record.NO_TIMESTAMP, e);
+            result.set(e == null ? offset : -1L, Record.NO_TIMESTAMP, e);
             if (callback != null) {
                 if (e == null)
                     callback.onCompletion(metadata, null);
                 else
                     callback.onCompletion(null, e);
             }
+            result.done();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
index 41c204d..a98b28e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -34,7 +34,7 @@ public final class ProduceRequestResult {
     private final CountDownLatch latch = new CountDownLatch(1);
     private final TopicPartition topicPartition;
 
-    private volatile long baseOffset = -1L;
+    private volatile Long baseOffset = null;
     private volatile long logAppendTime = Record.NO_TIMESTAMP;
     private volatile RuntimeException error;
 
@@ -48,16 +48,24 @@ public final class ProduceRequestResult {
     }
 
     /**
-     * Mark this request as complete and unblock any threads waiting on its 
completion.
+     * Set the result of the produce request.
      *
      * @param baseOffset The base offset assigned to the record
      * @param logAppendTime The log append time or -1 if CreateTime is being 
used
      * @param error The error that occurred if there was one, or null
      */
-    public void done(long baseOffset, long logAppendTime, RuntimeException 
error) {
+    public void set(long baseOffset, long logAppendTime, RuntimeException 
error) {
         this.baseOffset = baseOffset;
         this.logAppendTime = logAppendTime;
         this.error = error;
+    }
+
+    /**
+     * Mark this request as complete and unblock any threads waiting on its 
completion.
+     */
+    public void done() {
+        if (baseOffset == null)
+            throw new IllegalStateException("The method `set` must be invoked 
before this method.");
         this.latch.countDown();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 1b751bf..c8eddd5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -93,8 +93,8 @@ public final class RecordBatch {
         log.trace("Produced messages to topic-partition {} with base offset 
offset {} and error: {}.",
                   topicPartition, baseOffset, exception);
 
-        // Complete the future before invoking the callbacks as we rely on its 
state for the `onCompletion` call
-        produceFuture.done(baseOffset, logAppendTime, exception);
+        // Set the future before invoking the callbacks as we rely on its 
state for the `onCompletion` call
+        produceFuture.set(baseOffset, logAppendTime, exception);
 
         // execute callbacks
         for (Thunk thunk : thunks) {
@@ -110,6 +110,7 @@ public final class RecordBatch {
             }
         }
 
+        produceFuture.done();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index a420d61..bc8105b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -54,7 +54,8 @@ public class RecordSendTest {
         } catch (TimeoutException e) { /* this is good */
         }
 
-        request.done(baseOffset, Record.NO_TIMESTAMP, null);
+        request.set(baseOffset, Record.NO_TIMESTAMP, null);
+        request.done();
         assertTrue(future.isDone());
         assertEquals(baseOffset + relOffset, future.get().offset());
     }
@@ -86,7 +87,8 @@ public class RecordSendTest {
             public void run() {
                 try {
                     sleep(timeout);
-                    request.done(baseOffset, Record.NO_TIMESTAMP, error);
+                    request.set(baseOffset, Record.NO_TIMESTAMP, error);
+                    request.done();
                 } catch (InterruptedException e) { }
             }
         };

http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
index 45125cc..d0fa1c8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
@@ -165,7 +165,7 @@ public class SourceTaskOffsetCommitterTest extends 
ThreadedTest {
         EasyMock.expect(task.cancel(eq(false))).andReturn(false);
         EasyMock.expect(task.isDone()).andReturn(false);
         EasyMock.expect(task.get()).andThrow(new CancellationException());
-        mockLog.trace(EasyMock.anyString(), EasyMock.anyObject());
+        mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject());
         PowerMock.expectLastCall();
         PowerMock.replayAll();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 4fa1462..50941ce 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -847,28 +847,28 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockProducerInterceptor")
     producerProps.put("mock.interceptor.append", appendStr)
-    val testProducer = new KafkaProducer[String, String](producerProps, new 
StringSerializer, new StringSerializer)
+    val testProducer = new KafkaProducer(producerProps, new StringSerializer, 
new StringSerializer)
 
     // produce records
     val numRecords = 10
     (0 until numRecords).map { i =>
-      testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key 
$i", s"value $i"))
+      testProducer.send(new ProducerRecord(tp.topic, tp.partition, s"key $i", 
s"value $i"))
     }.foreach(_.get)
-    assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue())
-    assertEquals(numRecords, 
MockProducerInterceptor.ON_SUCCESS_COUNT.intValue())
+    assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue)
+    assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue)
     // send invalid record
     try {
-      testProducer.send(null, null)
+      testProducer.send(null)
       fail("Should not allow sending a null record")
     } catch {
       case _: Throwable =>
-        assertEquals("Interceptor should be notified about exception", 1, 
MockProducerInterceptor.ON_ERROR_COUNT.intValue())
+        assertEquals("Interceptor should be notified about exception", 1, 
MockProducerInterceptor.ON_ERROR_COUNT.intValue)
         assertEquals("Interceptor should not receive metadata with an 
exception when record is null", 0, 
MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue())
     }
 
     // create consumer with interceptor
     this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor")
-    val testConsumer = new KafkaConsumer[String, String](this.consumerConfig, 
new StringDeserializer(), new StringDeserializer())
+    val testConsumer = new KafkaConsumer(this.consumerConfig, new 
StringDeserializer, new StringDeserializer)
     testConsumer.assign(List(tp).asJava)
     testConsumer.seek(tp, 0)
 
@@ -876,22 +876,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val records = consumeRecords(testConsumer, numRecords)
     for (i <- 0 until numRecords) {
       val record = records(i)
-      assertEquals(s"key $i", new String(record.key()))
-      assertEquals(s"value $i$appendStr".toUpperCase(Locale.ROOT), new 
String(record.value()))
+      assertEquals(s"key $i", new String(record.key))
+      assertEquals(s"value $i$appendStr".toUpperCase(Locale.ROOT), new 
String(record.value))
     }
 
     // commit sync and verify onCommit is called
-    val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
+    val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue
     testConsumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(2L))).asJava)
     assertEquals(2, testConsumer.committed(tp).offset)
-    assertEquals(commitCountBefore + 1, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
+    assertEquals(commitCountBefore + 1, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue)
 
     // commit async and verify onCommit is called
     val commitCallback = new CountConsumerCommitCallback()
     testConsumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(5L))).asJava, commitCallback)
     awaitCommitCallback(testConsumer, commitCallback)
     assertEquals(5, testConsumer.committed(tp).offset)
-    assertEquals(commitCountBefore + 2, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
+    assertEquals(commitCountBefore + 2, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue)
 
     testConsumer.close()
     testProducer.close()

Reply via email to