This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit bdc75b2d24b7838c3f12870f3cdf2a4e8957db57 Author: Ismael Juma <[email protected]> AuthorDate: Fri Jul 23 13:43:31 2021 -0700 KAFKA-13116: Fix message_format_change_test and compatibility_test_new_broker_test failures (#11108) These failures were caused by a46b82bea9abbd08e5. Details for each test: * message_format_change_test: use IBP 2.8 so that we can write in older message formats. * compatibility_test_new_broker_test_failures: fix down-conversion path to handle empty record batches correctly. The record scan in the old code ensured that empty record batches were never down-converted, which hid this bug. * upgrade_test: set the IBP 2.8 when message format is < 0.11 to ensure we are actually writing with the old message format even though the test was passing without the change. Verified with ducker that some variants of these tests failed without these changes and passed with them. Also added a unit test for the down-conversion bug fix. Reviewers: Jason Gustafson <[email protected]> --- .../kafka/common/record/LazyDownConversionRecords.java | 15 ++++++++------- .../org/apache/kafka/common/record/FileRecordsTest.java | 10 ++++++++++ core/src/main/scala/kafka/server/KafkaApis.scala | 4 ++-- .../kafkatest/tests/client/message_format_change_test.py | 6 ++++-- tests/kafkatest/tests/core/upgrade_test.py | 5 ++++- 5 files changed, 28 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java index c359850..56ef8e1 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java @@ -55,19 +55,20 @@ public class LazyDownConversionRecords implements BaseRecords { this.firstOffset = firstOffset; this.time = Objects.requireNonNull(time); - // Kafka consumers expect at least one full batch of messages for every topic-partition. To guarantee this, we - // need to make sure that we are able to accommodate one full batch of down-converted messages. The way we achieve - // this is by having sizeInBytes method factor in the size of the first down-converted batch and return at least - // its size. + // To make progress, kafka consumers require at least one full record batch per partition, i.e. we need to + // ensure we can accommodate one full batch of down-converted messages. We achieve this by having `sizeInBytes` + // factor in the size of the first down-converted batch and we return at least that many bytes. java.util.Iterator<ConvertedRecords<?>> it = iterator(0); if (it.hasNext()) { firstConvertedBatch = it.next(); sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes()); } else { - // If there are no messages we got after down-conversion, make sure we are able to send at least an overflow - // message to the consumer. Typically, the consumer would need to increase the fetch size in such cases. + // If there are messages before down-conversion and no messages after down-conversion, + // make sure we are able to send at least an overflow message to the consumer so that it can throw + // a RecordTooLargeException. Typically, the consumer would need to increase the fetch size in such cases. + // If there are no messages before down-conversion, we return an empty record batch. firstConvertedBatch = null; - sizeInBytes = LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH; + sizeInBytes = records.batches().iterator().hasNext() ? LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH : 0; } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index c32359b..2e9ff33 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -431,6 +431,16 @@ public class FileRecordsTest { } @Test + public void testFormatConversionWithNoMessages() throws IOException { + TopicPartition tp = new TopicPartition("topic-1", 0); + LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(tp, MemoryRecords.EMPTY, RecordBatch.MAGIC_VALUE_V0, + 0, Time.SYSTEM); + assertEquals(0, lazyRecords.sizeInBytes()); + Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(16 * 1024L); + assertFalse(it.hasNext(), "No messages should be returned"); + } + + @Test public void testSearchForTimestamp() throws IOException { for (RecordVersion version : RecordVersion.values()) { testSearchForTimestamp(version); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 84e7758..bb56df2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -747,8 +747,8 @@ class KafkaApis(val requestChannel: RequestChannel, // supported by the fetch request version. // If the inter-broker protocol version is `3.0` or higher, the log config message format version is // always `3.0` (i.e. magic value is `v2`). As a result, we always go through the down-conversion - // path if the fetch version is 3 or lower (in rare cases the down-conversion may not be needed, - // but it's not worth optimizing for them). + // path if the fetch version is 3 or lower (in rare cases the down-conversion may not be needed, but + // it's not worth optimizing for them). // If the inter-broker protocol version is lower than `3.0`, we rely on the log config message format // version as a proxy for the on-disk magic value to maintain the long-standing behavior originally // introduced in Kafka 0.10.0. An important implication is that it's unsafe to downgrade the message diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py index 0cd9a21..cb6cf72 100644 --- a/tests/kafkatest/tests/client/message_format_change_test.py +++ b/tests/kafkatest/tests/client/message_format_change_test.py @@ -15,12 +15,12 @@ from ducktape.utils.util import wait_until from ducktape.mark.resource import cluster from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.kafka import config_property, KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, DEV_BRANCH, KafkaVersion +from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, V_2_8_0, DEV_BRANCH, KafkaVersion class MessageFormatChangeTest(ProduceConsumeValidateTest): @@ -82,6 +82,8 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest): "replication-factor": 3, 'configs': {"min.insync.replicas": 2}}}, controller_num_nodes_override=1) + for node in self.kafka.nodes: + node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0) # required for writing old message formats self.kafka.start() self.logger.info("First format change to 0.9.0") diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py index 39e4fa2..7fd4ab8 100644 --- a/tests/kafkatest/tests/core/upgrade_test.py +++ b/tests/kafkatest/tests/core/upgrade_test.py @@ -24,7 +24,7 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int from kafkatest.utils.remote_account import java_version -from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_2_8_0, DEV_BRANCH, KafkaVersion +from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_2_8_0, V_3_0_0, DEV_BRANCH, KafkaVersion from kafkatest.services.kafka.util import new_jdk_not_supported class TestUpgrade(ProduceConsumeValidateTest): @@ -83,6 +83,9 @@ class TestUpgrade(ProduceConsumeValidateTest): if to_message_format_version is None: del node.config[config_property.MESSAGE_FORMAT_VERSION] else: + # older message formats are not supported with IBP 3.0 or higher + if to_message_format_version < V_0_11_0_0: + node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0) node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version self.kafka.start_node(node) self.wait_until_rejoin()
