[ https://issues.apache.org/jira/browse/KAFKA-6739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424336#comment-16424336 ]
ASF GitHub Bot commented on KAFKA-6739: --------------------------------------- hachikuji closed pull request #4813: KAFKA-6739: Ignore the presence of headers when down-converting from V2 to V1/V0 URL: https://github.com/apache/kafka/pull/4813 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 2452798d485..89a5413e00c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -130,8 +130,13 @@ private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, R MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(), timestampType, recordBatchAndRecords.baseOffset, logAppendTime); - for (Record record : recordBatchAndRecords.records) - builder.append(record); + for (Record record : recordBatchAndRecords.records) { + // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported + if (magic > RecordBatch.MAGIC_VALUE_V1) + builder.append(record); + else + builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value()); + } builder.close(); return builder; diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 53ac2003586..fdd3ede16cc 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -40,6 +42,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assert.assertArrayEquals; public class FileRecordsTest { @@ -358,6 +361,11 @@ public void testConversion() throws IOException { private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException { List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L); + + Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()), + new RecordHeader("headerKey2", "headerValue2".getBytes()), + new RecordHeader("headerKey3", "headerValue3".getBytes())}; + List<SimpleRecord> records = asList( new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()), new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()), @@ -366,9 +374,10 @@ private void doTestConversion(CompressionType compressionType, byte toMagic) thr new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()), new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()), new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()), - new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes()), + new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers), new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()), - new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes())); + new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers)); + assertEquals("incorrect test setup", offsets.size(), records.size()); ByteBuffer buffer = ByteBuffer.allocate(1024); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, @@ -452,6 +461,7 @@ private void verifyConvertedRecords(List<SimpleRecord> initialRecords, assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp()); assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME)); assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE)); + assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers()); } i += 1; } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broker receives error when handling request with > java.lang.IllegalArgumentException: Magic v0 does not support record headers > ----------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-6739 > URL: https://issues.apache.org/jira/browse/KAFKA-6739 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 1.0.0 > Reporter: Koelli Mungee > Assignee: Dhruvil Shah > Priority: Critical > Fix For: 1.2.0, 1.1.1 > > > A broker running at 1.0.0 with the following properties > > {code:java} > log.message.format.version=1.0 > inter.broker.protocol.version=1.0 > {code} > receives this ERROR while handling fetch request for a message with a header > {code:java} > [2018-03-23 01:48:03,093] ERROR [KafkaApi-1] Error when handling request > {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=test=[{partition=11,fetch_offset=20645,max_bytes=1048576}]}]} > (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Magic v0 does > not support record headers > at > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403) > > at > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586) > > at > org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134) > > at > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109) > > at > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518) > > at scala.Option.map(Option.scala:146) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508) > > at scala.Option.flatMap(Option.scala:171) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > > at > kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034) > > at > kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52) > > at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588) > > at > kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175) > > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587) > > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) > > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) > > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596) > at kafka.server.KafkaApis.handle(KafkaApis.scala:100) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) > at java.lang.Thread.run(Thread.java:745) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)