Repository: kafka Updated Branches: refs/heads/0.10.2 8812d151a -> 29bc8905a
KAFKA-4699; Invoke producer callbacks before completing the future This behaviour was changed in 8b3c6c0, but it caused interceptor test failures (which rely on callbacks) and since weâre so close to code freeze, itâs better to be conservative. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #2440 from ijuma/kafka-4699-callbacks-invoked-before-future-is-completed (cherry picked from commit 254e3b77d656a610f19efd1124802e073dfda4b8) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29bc8905 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29bc8905 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29bc8905 Branch: refs/heads/0.10.2 Commit: 29bc8905acea32c8a34b889edb17107132b80037 Parents: 8812d15 Author: Ismael Juma <ism...@juma.me.uk> Authored: Thu Jan 26 09:50:25 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Jan 26 10:02:52 2017 -0800 ---------------------------------------------------------------------- .../kafka/clients/producer/MockProducer.java | 3 ++- .../internals/ProduceRequestResult.java | 14 +++++++++--- .../clients/producer/internals/RecordBatch.java | 5 ++-- .../kafka/clients/producer/RecordSendTest.java | 6 +++-- .../runtime/SourceTaskOffsetCommitterTest.java | 2 +- .../kafka/api/PlaintextConsumerTest.scala | 24 ++++++++++---------- 6 files changed, 33 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 008ca70..165437d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -245,13 +245,14 @@ public class MockProducer<K, V> implements Producer<K, V> { } public void complete(RuntimeException e) { - result.done(e == null ? offset : -1L, Record.NO_TIMESTAMP, e); + result.set(e == null ? offset : -1L, Record.NO_TIMESTAMP, e); if (callback != null) { if (e == null) callback.onCompletion(metadata, null); else callback.onCompletion(null, e); } + result.done(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java index 41c204d..a98b28e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java @@ -34,7 +34,7 @@ public final class ProduceRequestResult { private final CountDownLatch latch = new CountDownLatch(1); private final TopicPartition topicPartition; - private volatile long baseOffset = -1L; + private volatile Long baseOffset = null; private volatile long logAppendTime = Record.NO_TIMESTAMP; private volatile RuntimeException error; @@ -48,16 +48,24 @@ public final class ProduceRequestResult { } /** - * Mark this request as complete and unblock any threads waiting on its completion. + * Set the result of the produce request. * * @param baseOffset The base offset assigned to the record * @param logAppendTime The log append time or -1 if CreateTime is being used * @param error The error that occurred if there was one, or null */ - public void done(long baseOffset, long logAppendTime, RuntimeException error) { + public void set(long baseOffset, long logAppendTime, RuntimeException error) { this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; this.error = error; + } + + /** + * Mark this request as complete and unblock any threads waiting on its completion. + */ + public void done() { + if (baseOffset == null) + throw new IllegalStateException("The method `set` must be invoked before this method."); this.latch.countDown(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 1b751bf..c8eddd5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -93,8 +93,8 @@ public final class RecordBatch { log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, exception); - // Complete the future before invoking the callbacks as we rely on its state for the `onCompletion` call - produceFuture.done(baseOffset, logAppendTime, exception); + // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call + produceFuture.set(baseOffset, logAppendTime, exception); // execute callbacks for (Thunk thunk : thunks) { @@ -110,6 +110,7 @@ public final class RecordBatch { } } + produceFuture.done(); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java index a420d61..bc8105b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java @@ -54,7 +54,8 @@ public class RecordSendTest { } catch (TimeoutException e) { /* this is good */ } - request.done(baseOffset, Record.NO_TIMESTAMP, null); + request.set(baseOffset, Record.NO_TIMESTAMP, null); + request.done(); assertTrue(future.isDone()); assertEquals(baseOffset + relOffset, future.get().offset()); } @@ -86,7 +87,8 @@ public class RecordSendTest { public void run() { try { sleep(timeout); - request.done(baseOffset, Record.NO_TIMESTAMP, error); + request.set(baseOffset, Record.NO_TIMESTAMP, error); + request.done(); } catch (InterruptedException e) { } } }; http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java index 45125cc..d0fa1c8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java @@ -165,7 +165,7 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest { EasyMock.expect(task.cancel(eq(false))).andReturn(false); EasyMock.expect(task.isDone()).andReturn(false); EasyMock.expect(task.get()).andThrow(new CancellationException()); - mockLog.trace(EasyMock.anyString(), EasyMock.anyObject()); + mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject()); PowerMock.expectLastCall(); PowerMock.replayAll(); http://git-wip-us.apache.org/repos/asf/kafka/blob/29bc8905/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 4fa1462..50941ce 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -847,28 +847,28 @@ class PlaintextConsumerTest extends BaseConsumerTest { producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor") producerProps.put("mock.interceptor.append", appendStr) - val testProducer = new KafkaProducer[String, String](producerProps, new StringSerializer, new StringSerializer) + val testProducer = new KafkaProducer(producerProps, new StringSerializer, new StringSerializer) // produce records val numRecords = 10 (0 until numRecords).map { i => - testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i", s"value $i")) + testProducer.send(new ProducerRecord(tp.topic, tp.partition, s"key $i", s"value $i")) }.foreach(_.get) - assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue()) - assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue()) + assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue) + assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue) // send invalid record try { - testProducer.send(null, null) + testProducer.send(null) fail("Should not allow sending a null record") } catch { case _: Throwable => - assertEquals("Interceptor should be notified about exception", 1, MockProducerInterceptor.ON_ERROR_COUNT.intValue()) + assertEquals("Interceptor should be notified about exception", 1, MockProducerInterceptor.ON_ERROR_COUNT.intValue) assertEquals("Interceptor should not receive metadata with an exception when record is null", 0, MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue()) } // create consumer with interceptor this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor") - val testConsumer = new KafkaConsumer[String, String](this.consumerConfig, new StringDeserializer(), new StringDeserializer()) + val testConsumer = new KafkaConsumer(this.consumerConfig, new StringDeserializer, new StringDeserializer) testConsumer.assign(List(tp).asJava) testConsumer.seek(tp, 0) @@ -876,22 +876,22 @@ class PlaintextConsumerTest extends BaseConsumerTest { val records = consumeRecords(testConsumer, numRecords) for (i <- 0 until numRecords) { val record = records(i) - assertEquals(s"key $i", new String(record.key())) - assertEquals(s"value $i$appendStr".toUpperCase(Locale.ROOT), new String(record.value())) + assertEquals(s"key $i", new String(record.key)) + assertEquals(s"value $i$appendStr".toUpperCase(Locale.ROOT), new String(record.value)) } // commit sync and verify onCommit is called - val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() + val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue testConsumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava) assertEquals(2, testConsumer.committed(tp).offset) - assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()) + assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue) // commit async and verify onCommit is called val commitCallback = new CountConsumerCommitCallback() testConsumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(5L))).asJava, commitCallback) awaitCommitCallback(testConsumer, commitCallback) assertEquals(5, testConsumer.committed(tp).offset) - assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()) + assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue) testConsumer.close() testProducer.close()