[ 
https://issues.apache.org/jira/browse/KAFKA-6739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424336#comment-16424336
 ] 

ASF GitHub Bot commented on KAFKA-6739:
---------------------------------------

hachikuji closed pull request #4813: KAFKA-6739: Ignore the presence of headers 
when down-converting from V2 to V1/V0
URL: https://github.com/apache/kafka/pull/4813
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 2452798d485..89a5413e00c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -130,8 +130,13 @@ private MemoryRecordsBuilder convertRecordBatch(byte 
magic, ByteBuffer buffer, R
 
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, 
batch.compressionType(),
                 timestampType, recordBatchAndRecords.baseOffset, 
logAppendTime);
-        for (Record record : recordBatchAndRecords.records)
-            builder.append(record);
+        for (Record record : recordBatchAndRecords.records) {
+            // Down-convert this record. Ignore headers when down-converting 
to V0 and V1 since they are not supported
+            if (magic > RecordBatch.MAGIC_VALUE_V1)
+                builder.append(record);
+            else
+                builder.appendWithOffset(record.offset(), record.timestamp(), 
record.key(), record.value());
+        }
 
         builder.close();
         return builder;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 53ac2003586..fdd3ede16cc 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -40,6 +42,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
 
 public class FileRecordsTest {
 
@@ -358,6 +361,11 @@ public void testConversion() throws IOException {
 
     private void doTestConversion(CompressionType compressionType, byte 
toMagic) throws IOException {
         List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 
24L);
+
+        Header[] headers = {new RecordHeader("headerKey1", 
"headerValue1".getBytes()),
+                            new RecordHeader("headerKey2", 
"headerValue2".getBytes()),
+                            new RecordHeader("headerKey3", 
"headerValue3".getBytes())};
+
         List<SimpleRecord> records = asList(
                 new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
                 new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
@@ -366,9 +374,10 @@ private void doTestConversion(CompressionType 
compressionType, byte toMagic) thr
                 new SimpleRecord(5L, "k5".getBytes(), "hello 
again".getBytes()),
                 new SimpleRecord(6L, "k6".getBytes(), "I sense 
indecision".getBytes()),
                 new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
-                new SimpleRecord(8L, "k8".getBytes(), "running 
out".getBytes()),
+                new SimpleRecord(8L, "k8".getBytes(), "running 
out".getBytes(), headers),
                 new SimpleRecord(9L, "k9".getBytes(), "ok, almost 
done".getBytes()),
-                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes()));
+                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), 
headers));
+        assertEquals("incorrect test setup", offsets.size(), records.size());
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.MAGIC_VALUE_V0, compressionType,
@@ -452,6 +461,7 @@ private void verifyConvertedRecords(List<SimpleRecord> 
initialRecords,
                     assertEquals("Timestamp should not change", 
initialRecords.get(i).timestamp(), record.timestamp());
                     
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
                     
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+                    assertArrayEquals("Headers should not change", 
initialRecords.get(i).headers(), record.headers());
                 }
                 i += 1;
             }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Broker receives error when handling request with 
> java.lang.IllegalArgumentException: Magic v0 does not support record headers
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6739
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6739
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.0.0
>            Reporter: Koelli Mungee
>            Assignee: Dhruvil Shah
>            Priority: Critical
>             Fix For: 1.2.0, 1.1.1
>
>
> A broker running at 1.0.0 with the following properties 
>  
> {code:java}
> log.message.format.version=1.0
> inter.broker.protocol.version=1.0
> {code}
> receives this ERROR while handling fetch request for a message with a header
> {code:java}
> [2018-03-23 01:48:03,093] ERROR [KafkaApi-1] Error when handling request 
> {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=test=[{partition=11,fetch_offset=20645,max_bytes=1048576}]}]}
>  (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Magic v0 does 
> not support record headers 
> at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
>  
> at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)
>  
> at 
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)
>  
> at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)
>  
> at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) 
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
>  
> at scala.Option.map(Option.scala:146) 
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
>  
> at scala.Option.flatMap(Option.scala:171) 
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
>  
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  
> at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
>  
> at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
>  
> at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) 
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
>  
> at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
>  
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)
>  
> at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>  
> at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>  
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820) 
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596) 
> at kafka.server.KafkaApis.handle(KafkaApis.scala:100) 
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) 
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to