Repository: kafka
Updated Branches:
  refs/heads/trunk 1d291a421 -> 1cd86284e


KAFKA-5700; Producer should not drop header information when splitting batches

Producer should not drop header information when splitting batches.  This PR 
also corrects a minor typo in Sender.java, where `spitting and retrying` should 
be `splitting and retrying`.

Author: huxihx <huxi...@hotmail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jiangjie Qin <becket....@gmail.com>

Closes #3620 from huxihx/KAFKA-5700


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cd86284
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cd86284
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cd86284

Branch: refs/heads/trunk
Commit: 1cd86284e808e2846e94b312bb55141f6d216d51
Parents: 1d291a4
Author: huxihx <huxi...@hotmail.com>
Authored: Sun Aug 6 22:25:52 2017 -0700
Committer: Jiangjie Qin <becket....@gmail.com>
Committed: Sun Aug 6 22:25:52 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/ProducerBatch.java       |  2 +-
 .../clients/producer/internals/Sender.java      |  2 +-
 .../producer/internals/ProducerBatchTest.java   | 40 ++++++++++++++++++++
 3 files changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd86284/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 53563ba..ee7d21a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -128,7 +128,7 @@ public final class ProducerBatch {
             return false;
         } else {
             // No need to get the CRC.
-            this.recordsBuilder.append(timestamp, key, value);
+            this.recordsBuilder.append(timestamp, key, value, headers);
             this.maxRecordSize = Math.max(this.maxRecordSize, 
AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                     recordsBuilder.compressionType(), key, value, headers));
             FutureRecordMetadata future = new 
FutureRecordMetadata(this.produceFuture, this.recordCount,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd86284/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index e6d8bc5..8519c4a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -498,7 +498,7 @@ public class Sender implements Runnable {
                 (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || 
batch.isCompressed())) {
             // If the batch is too large, we split the batch and send the 
split batches again. We do not decrement
             // the retry attempts in this case.
-            log.warn("Got error produce response in correlation id {} on 
topic-partition {}, spitting and retrying ({} attempts left). Error: {}",
+            log.warn("Got error produce response in correlation id {} on 
topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
                      correlationId,
                      batch.topicPartition,
                      this.retries - batch.attempts(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cd86284/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 2c7e4f9..41aa5c6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.producer.internals;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.LegacyRecord;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -31,6 +33,7 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Deque;
+import java.util.Iterator;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
@@ -139,6 +142,43 @@ public class ProducerBatchTest {
     }
 
     @Test
+    public void testSplitPreservesHeaders() {
+        for (CompressionType compressionType : CompressionType.values()) {
+            MemoryRecordsBuilder builder = MemoryRecords.builder(
+                    ByteBuffer.allocate(1024),
+                    MAGIC_VALUE_V2,
+                    compressionType,
+                    TimestampType.CREATE_TIME,
+                    0L);
+            ProducerBatch batch = new ProducerBatch(new 
TopicPartition("topic", 1), builder, now);
+            Header header = new RecordHeader("header-key", 
"header-value".getBytes());
+
+            while (true) {
+                FutureRecordMetadata future = batch.tryAppend(
+                        now, "hi".getBytes(), "there".getBytes(),
+                        new Header[]{header}, null, now);
+                if (future == null) {
+                    break;
+                }
+            }
+            Deque<ProducerBatch> batches = batch.split(200);
+            assertTrue("This batch should be split to multiple small 
batches.", batches.size() >= 2);
+
+            for (ProducerBatch splitProducerBatch : batches) {
+                for (RecordBatch splitBatch : 
splitProducerBatch.records().batches()) {
+                    Iterator<Record> iter = splitBatch.iterator();
+                    while (iter.hasNext()) {
+                        Record record = iter.next();
+                        assertTrue("Header size should be 1.", 
record.headers().length == 1);
+                        assertTrue("Header key should be 'header-key'.", 
record.headers()[0].key().equals("header-key"));
+                        assertTrue("Header value should be 'header-value'.", 
new String(record.headers()[0].value()).equals("header-value"));
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
     public void testSplitPreservesMagicAndCompressionType() {
         for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1, 
MAGIC_VALUE_V2)) {
             for (CompressionType compressionType : CompressionType.values()) {

Reply via email to