Repository: kafka
Updated Branches:
  refs/heads/trunk 0f3f2f56a -> 6c92fc557


KAFKA-5429; Ignore produce response if batch was previously aborted

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk>

Closes #3300 from hachikuji/KAFKA-5429


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6c92fc55
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6c92fc55
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6c92fc55

Branch: refs/heads/trunk
Commit: 6c92fc557682e853a0a3ed5684c13174fff45acf
Parents: 0f3f2f5
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Jun 12 16:29:29 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Jun 12 16:29:29 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/ProducerBatch.java       | 53 ++++++++++++----
 .../producer/internals/RecordAccumulator.java   |  8 +--
 .../producer/internals/ProducerBatchTest.java   | 67 ++++++++++++++++++++
 3 files changed, 111 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6c92fc55/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 7f3ba15..b33e080 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -41,13 +41,12 @@ import java.util.ArrayList;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
 import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
 
-
 /**
  * A batch of records that is or will be sent.
  *
@@ -57,22 +56,24 @@ public final class ProducerBatch {
 
     private static final Logger log = 
LoggerFactory.getLogger(ProducerBatch.class);
 
+    private enum FinalState { ABORTED, FAILED, SUCCEEDED };
+
     final long createdMs;
     final TopicPartition topicPartition;
     final ProduceRequestResult produceFuture;
 
     private final List<Thunk> thunks = new ArrayList<>();
     private final MemoryRecordsBuilder recordsBuilder;
-
     private final AtomicInteger attempts = new AtomicInteger(0);
     private final boolean isSplitBatch;
+    private final AtomicReference<FinalState> finalState = new 
AtomicReference<>(null);
+
     int recordCount;
     int maxRecordSize;
     private long lastAttemptMs;
     private long lastAppendTime;
     private long drainedMs;
     private String expiryErrorMessage;
-    private AtomicBoolean completed;
     private boolean retry;
 
     public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder 
recordsBuilder, long now) {
@@ -86,7 +87,6 @@ public final class ProducerBatch {
         this.topicPartition = tp;
         this.lastAppendTime = createdMs;
         this.produceFuture = new ProduceRequestResult(topicPartition);
-        this.completed = new AtomicBoolean();
         this.retry = false;
         this.isSplitBatch = isSplitBatch;
         float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),
@@ -143,7 +143,20 @@ public final class ProducerBatch {
     }
 
     /**
-     * Complete the request.
+     * Abort the batch and complete the future and callbacks.
+     *
+     * @param exception The exception to use to complete the future and 
awaiting callbacks.
+     */
+    public void abort(RuntimeException exception) {
+        if (!finalState.compareAndSet(null, FinalState.ABORTED))
+            throw new IllegalStateException("Batch has already been completed 
in final state " + finalState.get());
+
+        log.trace("Aborting batch for partition {}", topicPartition, 
exception);
+        completeFutureAndFireCallbacks(ProduceResponse.INVALID_OFFSET, 
RecordBatch.NO_TIMESTAMP, exception);
+    }
+
+    /**
+     * Complete the request. If the batch was previously aborted, this is a 
no-op.
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being 
used
@@ -152,10 +165,20 @@ public final class ProducerBatch {
     public void done(long baseOffset, long logAppendTime, RuntimeException 
exception) {
         log.trace("Produced messages to topic-partition {} with base offset 
offset {} and error: {}.",
                   topicPartition, baseOffset, exception);
+        FinalState finalState = exception != null ? FinalState.FAILED : 
FinalState.SUCCEEDED;
+        if (!this.finalState.compareAndSet(null, finalState)) {
+            if (this.finalState.get() == FinalState.ABORTED) {
+                log.debug("ProduceResponse returned for {} after batch had 
already been aborted.", topicPartition);
+                return;
+            } else {
+                throw new IllegalStateException("Batch has already been 
completed in final state " + this.finalState.get());
+            }
+        }
 
-        if (completed.getAndSet(true))
-            throw new IllegalStateException("Batch has already been 
completed");
+        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+    }
 
+    private void completeFutureAndFireCallbacks(long baseOffset, long 
logAppendTime, RuntimeException exception) {
         // Set the future before invoking the callbacks as we rely on its 
state for the `onCompletion` call
         produceFuture.set(baseOffset, logAppendTime, exception);
 
@@ -275,7 +298,7 @@ public final class ProducerBatch {
 
         boolean expired = expiryErrorMessage != null;
         if (expired)
-            abort();
+            abortRecordAppends();
         return expired;
     }
 
@@ -366,7 +389,14 @@ public final class ProducerBatch {
         }
     }
 
-    public void abort() {
+    /**
+     * Abort the record builder and reset the state of the underlying buffer. 
This is used prior to aborting
+     * the batch with {@link #abort(RuntimeException)} and ensures that no 
record previously appended can be
+     * read. This is used in scenarios where we want to ensure a batch 
ultimately gets aborted, but in which
+     * it is not safe to invoke the completion callbacks (e.g. because we are 
holding a lock,
+     * {@link RecordAccumulator#abortBatches()}).
+     */
+    public void abortRecordAppends() {
         recordsBuilder.abort();
     }
 
@@ -390,9 +420,6 @@ public final class ProducerBatch {
         return recordsBuilder.magic();
     }
 
-    /**
-     * Return the ProducerId (Pid) of the current batch.
-     */
     public long producerId() {
         return recordsBuilder.producerId();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6c92fc55/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 7237b6d..76cc6bd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -614,10 +614,10 @@ public final class RecordAccumulator {
         for (ProducerBatch batch : incomplete.all()) {
             Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
             synchronized (dq) {
-                batch.abort();
+                batch.abortRecordAppends();
                 dq.remove(batch);
             }
-            batch.done(-1L, RecordBatch.NO_TIMESTAMP, reason);
+            batch.abort(reason);
             deallocate(batch);
         }
     }
@@ -629,12 +629,12 @@ public final class RecordAccumulator {
             synchronized (dq) {
                 if (!batch.isClosed()) {
                     aborted = true;
-                    batch.abort();
+                    batch.abortRecordAppends();
                     dq.remove(batch);
                 }
             }
             if (aborted) {
-                batch.done(-1L, RecordBatch.NO_TIMESTAMP, reason);
+                batch.abort(reason);
                 deallocate(batch);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6c92fc55/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 6d2d2f7..02989f0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.LegacyRecord;
@@ -29,6 +31,7 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Deque;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
 import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1;
@@ -38,6 +41,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class ProducerBatchTest {
 
@@ -55,6 +59,69 @@ public class ProducerBatchTest {
     }
 
     @Test
+    public void testBatchAbort() throws Exception {
+        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 
1), memoryRecordsBuilder, now);
+        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], 
Record.EMPTY_HEADERS, null, now);
+
+        KafkaException exception = new KafkaException();
+        batch.abort(exception);
+        assertTrue(future.isDone());
+
+        // subsequent completion should be ignored
+        batch.done(500L, 2342342341L, null);
+        batch.done(-1, -1, new KafkaException());
+
+        assertTrue(future.isDone());
+        try {
+            future.get();
+            fail("Future should have thrown");
+        } catch (ExecutionException e) {
+            assertEquals(exception, e.getCause());
+        }
+    }
+
+    @Test
+    public void testBatchCannotAbortTwice() throws Exception {
+        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 
1), memoryRecordsBuilder, now);
+        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], 
Record.EMPTY_HEADERS, null, now);
+        KafkaException exception = new KafkaException();
+        batch.abort(exception);
+
+        try {
+            batch.abort(new KafkaException());
+            fail("Expected exception from abort");
+        } catch (IllegalStateException e) {
+            // expected
+        }
+
+        assertTrue(future.isDone());
+        try {
+            future.get();
+            fail("Future should have thrown");
+        } catch (ExecutionException e) {
+            assertEquals(exception, e.getCause());
+        }
+    }
+
+    @Test
+    public void testBatchCannotCompleteTwice() throws Exception {
+        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 
1), memoryRecordsBuilder, now);
+        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], 
Record.EMPTY_HEADERS, null, now);
+        batch.done(500L, 10L, null);
+
+        try {
+            batch.done(1000L, 20L, null);
+            fail("Expected exception from done");
+        } catch (IllegalStateException e) {
+            // expected
+        }
+
+        RecordMetadata recordMetadata = future.get();
+        assertEquals(500L, recordMetadata.offset());
+        assertEquals(10L, recordMetadata.timestamp());
+    }
+
+    @Test
     public void testAppendedChecksumMagicV0AndV1() {
         for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1)) {
             MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(128), magic,

Reply via email to