This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit bdc75b2d24b7838c3f12870f3cdf2a4e8957db57
Author: Ismael Juma <[email protected]>
AuthorDate: Fri Jul 23 13:43:31 2021 -0700

    KAFKA-13116: Fix message_format_change_test and 
compatibility_test_new_broker_test failures (#11108)
    
    These failures were caused by a46b82bea9abbd08e5. Details for each test:
    
    * message_format_change_test: use IBP 2.8 so that we can write in older 
message
    formats.
    * compatibility_test_new_broker_test_failures: fix down-conversion path to 
handle
    empty record batches correctly. The record scan in the old code ensured that
    empty record batches were never down-converted, which hid this bug.
    * upgrade_test: set the IBP 2.8 when message format is < 0.11 to ensure we 
are
    actually writing with the old message format even though the test was 
passing
    without the change.
    
    Verified with ducker that some variants of these tests failed without these 
changes
    and passed with them. Also added a unit test for the down-conversion bug 
fix.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../kafka/common/record/LazyDownConversionRecords.java    | 15 ++++++++-------
 .../org/apache/kafka/common/record/FileRecordsTest.java   | 10 ++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala          |  4 ++--
 .../kafkatest/tests/client/message_format_change_test.py  |  6 ++++--
 tests/kafkatest/tests/core/upgrade_test.py                |  5 ++++-
 5 files changed, 28 insertions(+), 12 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
index c359850..56ef8e1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
@@ -55,19 +55,20 @@ public class LazyDownConversionRecords implements 
BaseRecords {
         this.firstOffset = firstOffset;
         this.time = Objects.requireNonNull(time);
 
-        // Kafka consumers expect at least one full batch of messages for 
every topic-partition. To guarantee this, we
-        // need to make sure that we are able to accommodate one full batch of 
down-converted messages. The way we achieve
-        // this is by having sizeInBytes method factor in the size of the 
first down-converted batch and return at least
-        // its size.
+        // To make progress, kafka consumers require at least one full record 
batch per partition, i.e. we need to
+        // ensure we can accommodate one full batch of down-converted 
messages. We achieve this by having `sizeInBytes`
+        // factor in the size of the first down-converted batch and we return 
at least that many bytes.
         java.util.Iterator<ConvertedRecords<?>> it = iterator(0);
         if (it.hasNext()) {
             firstConvertedBatch = it.next();
             sizeInBytes = Math.max(records.sizeInBytes(), 
firstConvertedBatch.records().sizeInBytes());
         } else {
-            // If there are no messages we got after down-conversion, make 
sure we are able to send at least an overflow
-            // message to the consumer. Typically, the consumer would need to 
increase the fetch size in such cases.
+            // If there are messages before down-conversion and no messages 
after down-conversion,
+            // make sure we are able to send at least an overflow message to 
the consumer so that it can throw
+            // a RecordTooLargeException. Typically, the consumer would need 
to increase the fetch size in such cases.
+            // If there are no messages before down-conversion, we return an 
empty record batch.
             firstConvertedBatch = null;
-            sizeInBytes = 
LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH;
+            sizeInBytes = records.batches().iterator().hasNext() ? 
LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH : 0;
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index c32359b..2e9ff33 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -431,6 +431,16 @@ public class FileRecordsTest {
     }
 
     @Test
+    public void testFormatConversionWithNoMessages() throws IOException {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        LazyDownConversionRecords lazyRecords = new 
LazyDownConversionRecords(tp, MemoryRecords.EMPTY, RecordBatch.MAGIC_VALUE_V0,
+            0, Time.SYSTEM);
+        assertEquals(0, lazyRecords.sizeInBytes());
+        Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(16 * 1024L);
+        assertFalse(it.hasNext(), "No messages should be returned");
+    }
+
+    @Test
     public void testSearchForTimestamp() throws IOException {
         for (RecordVersion version : RecordVersion.values()) {
             testSearchForTimestamp(version);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 84e7758..bb56df2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -747,8 +747,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         // supported by the fetch request version.
         // If the inter-broker protocol version is `3.0` or higher, the log 
config message format version is
         // always `3.0` (i.e. magic value is `v2`). As a result, we always go 
through the down-conversion
-        // path if the fetch version is 3 or lower (in rare cases the 
down-conversion may not be needed,
-        // but it's not worth optimizing for them).
+        // path if the fetch version is 3 or lower (in rare cases the 
down-conversion may not be needed, but
+        // it's not worth optimizing for them).
         // If the inter-broker protocol version is lower than `3.0`, we rely 
on the log config message format
         // version as a proxy for the on-disk magic value to maintain the 
long-standing behavior originally
         // introduced in Kafka 0.10.0. An important implication is that it's 
unsafe to downgrade the message
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py 
b/tests/kafkatest/tests/client/message_format_change_test.py
index 0cd9a21..cb6cf72 100644
--- a/tests/kafkatest/tests/client/message_format_change_test.py
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -15,12 +15,12 @@ from ducktape.utils.util import wait_until
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.kafka import config_property, KafkaService, quorum
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, 
DEV_BRANCH, KafkaVersion
+from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, V_2_8_0, 
DEV_BRANCH, KafkaVersion
 
 
 class MessageFormatChangeTest(ProduceConsumeValidateTest):
@@ -82,6 +82,8 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
                                                                     
"replication-factor": 3,
                                                                     'configs': 
{"min.insync.replicas": 2}}},
                                                                     
controller_num_nodes_override=1)
+        for node in self.kafka.nodes:
+            node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = 
str(V_2_8_0) # required for writing old message formats
 
         self.kafka.start()
         self.logger.info("First format change to 0.9.0")
diff --git a/tests/kafkatest/tests/core/upgrade_test.py 
b/tests/kafkatest/tests/core/upgrade_test.py
index 39e4fa2..7fd4ab8 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -24,7 +24,7 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
 from kafkatest.utils.remote_account import java_version
-from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, 
LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, 
LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, 
LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_2_8_0, 
DEV_BRANCH, KafkaVersion
+from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, 
LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, 
LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, 
LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_2_8_0, V_3_0_0, 
DEV_BRANCH, KafkaVersion
 from kafkatest.services.kafka.util import new_jdk_not_supported
 
 class TestUpgrade(ProduceConsumeValidateTest):
@@ -83,6 +83,9 @@ class TestUpgrade(ProduceConsumeValidateTest):
             if to_message_format_version is None:
                 del node.config[config_property.MESSAGE_FORMAT_VERSION]
             else:
+                # older message formats are not supported with IBP 3.0 or 
higher
+                if to_message_format_version < V_0_11_0_0:
+                    node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] 
= str(V_2_8_0)
                 node.config[config_property.MESSAGE_FORMAT_VERSION] = 
to_message_format_version
             self.kafka.start_node(node)
             self.wait_until_rejoin()

Reply via email to