This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fcd98da9aee KAFKA-18445 Remove LazyDownConversionRecords and
LazyDownConversionRecordsSend (#18445)
fcd98da9aee is described below
commit fcd98da9aee8f59fe563712f84a80e6454ba0896
Author: xijiu <[email protected]>
AuthorDate: Fri Jan 10 00:22:56 2025 +0800
KAFKA-18445 Remove LazyDownConversionRecords and
LazyDownConversionRecordsSend (#18445)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../common/record/LazyDownConversionRecords.java | 190 ----------------
.../record/LazyDownConversionRecordsSend.java | 108 ---------
.../kafka/common/record/MultiRecordsSend.java | 26 ---
.../kafka/common/requests/FetchResponse.java | 4 -
.../kafka/common/requests/ShareFetchResponse.java | 4 -
.../kafka/common/record/FileRecordsTest.java | 34 ---
.../record/LazyDownConversionRecordsTest.java | 245 ---------------------
7 files changed, 611 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
deleted file mode 100644
index 50a8f27f42c..00000000000
---
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.Time;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Encapsulation for holding records that require down-conversion in a lazy,
chunked manner (KIP-283). See
- * {@link LazyDownConversionRecordsSend} for the actual chunked send
implementation.
- */
-public class LazyDownConversionRecords implements BaseRecords {
- private final TopicPartition topicPartition;
- private final Records records;
- private final byte toMagic;
- private final long firstOffset;
- private ConvertedRecords<?> firstConvertedBatch;
- private final int sizeInBytes;
- private final Time time;
-
- /**
- * @param topicPartition The topic-partition to which records belong
- * @param records Records to lazily down-convert
- * @param toMagic Magic version to down-convert to
- * @param firstOffset The starting offset for down-converted records. This
only impacts some cases. See
- * {@link RecordsUtil#downConvert(Iterable, byte, long,
Time)} for an explanation.
- * @param time The time instance to use
- *
- * @throws
org.apache.kafka.common.errors.UnsupportedCompressionTypeException If the first
batch to down-convert
- * has a compression type which we do not support down-conversion for.
- */
- public LazyDownConversionRecords(TopicPartition topicPartition, Records
records, byte toMagic, long firstOffset, Time time) {
- this.topicPartition = Objects.requireNonNull(topicPartition);
- this.records = Objects.requireNonNull(records);
- this.toMagic = toMagic;
- this.firstOffset = firstOffset;
- this.time = Objects.requireNonNull(time);
-
- // To make progress, kafka consumers require at least one full record
batch per partition, i.e. we need to
- // ensure we can accommodate one full batch of down-converted
messages. We achieve this by having `sizeInBytes`
- // factor in the size of the first down-converted batch and we return
at least that many bytes.
- java.util.Iterator<ConvertedRecords<?>> it = iterator(0);
- if (it.hasNext()) {
- firstConvertedBatch = it.next();
- sizeInBytes = Math.max(records.sizeInBytes(),
firstConvertedBatch.records().sizeInBytes());
- } else {
- // If there are messages before down-conversion and no messages
after down-conversion,
- // make sure we are able to send at least an overflow message to
the consumer so that it can throw
- // a RecordTooLargeException. Typically, the consumer would need
to increase the fetch size in such cases.
- // If there are no messages before down-conversion, we return an
empty record batch.
- firstConvertedBatch = null;
- sizeInBytes = records.batches().iterator().hasNext() ?
LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH : 0;
- }
- }
-
- @Override
- public int sizeInBytes() {
- return sizeInBytes;
- }
-
- @Override
- public LazyDownConversionRecordsSend toSend() {
- return new LazyDownConversionRecordsSend(this);
- }
-
- public TopicPartition topicPartition() {
- return topicPartition;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof LazyDownConversionRecords) {
- LazyDownConversionRecords that = (LazyDownConversionRecords) o;
- return toMagic == that.toMagic &&
- firstOffset == that.firstOffset &&
- topicPartition.equals(that.topicPartition) &&
- records.equals(that.records);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- int result = toMagic;
- result = 31 * result + Long.hashCode(firstOffset);
- result = 31 * result + topicPartition.hashCode();
- result = 31 * result + records.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "LazyDownConversionRecords(size=" + sizeInBytes +
- ", underlying=" + records +
- ", toMagic=" + toMagic +
- ", firstOffset=" + firstOffset +
- ")";
- }
-
- public final java.util.Iterator<ConvertedRecords<?>> iterator(long
maximumReadSize) {
- // We typically expect only one iterator instance to be created, so
null out the first converted batch after
- // first use to make it available for GC.
- ConvertedRecords<?> firstBatch = firstConvertedBatch;
- firstConvertedBatch = null;
- return new Iterator(records, maximumReadSize, firstBatch);
- }
-
- /**
- * Implementation for being able to iterate over down-converted records.
Goal of this implementation is to keep
- * it as memory-efficient as possible by not having to maintain all
down-converted records in-memory. Maintains
- * a view into batches of down-converted records.
- */
- private class Iterator extends AbstractIterator<ConvertedRecords<?>> {
- private final AbstractIterator<? extends RecordBatch> batchIterator;
- private final long maximumReadSize;
- private ConvertedRecords<?> firstConvertedBatch;
-
- /**
- * @param recordsToDownConvert Records that require down-conversion
- * @param maximumReadSize Maximum possible size of underlying records
that will be down-converted in each call to
- * {@link #makeNext()}. This is a soft limit as
{@link #makeNext()} will always convert
- * and return at least one full message batch.
- */
- private Iterator(Records recordsToDownConvert, long maximumReadSize,
ConvertedRecords<?> firstConvertedBatch) {
- this.batchIterator = recordsToDownConvert.batchIterator();
- this.maximumReadSize = maximumReadSize;
- this.firstConvertedBatch = firstConvertedBatch;
- // If we already have the first down-converted batch, advance the
underlying records iterator to next batch
- if (firstConvertedBatch != null)
- this.batchIterator.next();
- }
-
- /**
- * Make next set of down-converted records
- * @return Down-converted records
- */
- @Override
- protected ConvertedRecords<?> makeNext() {
- // If we have cached the first down-converted batch, return that
now
- if (firstConvertedBatch != null) {
- ConvertedRecords<?> convertedBatch = firstConvertedBatch;
- firstConvertedBatch = null;
- return convertedBatch;
- }
-
- while (batchIterator.hasNext()) {
- final List<RecordBatch> batches = new ArrayList<>();
- boolean isFirstBatch = true;
- long sizeSoFar = 0;
-
- // Figure out batches we should down-convert based on the size
constraints
- while (batchIterator.hasNext() &&
- (isFirstBatch || (batchIterator.peek().sizeInBytes() +
sizeSoFar) <= maximumReadSize)) {
- RecordBatch currentBatch = batchIterator.next();
- batches.add(currentBatch);
- sizeSoFar += currentBatch.sizeInBytes();
- isFirstBatch = false;
- }
-
- ConvertedRecords<MemoryRecords> convertedRecords =
RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
- // During conversion, it is possible that we drop certain
batches because they do not have an equivalent
- // representation in the message format we want to convert to.
For example, V0 and V1 message formats
- // have no notion of transaction markers which were introduced
in V2 so they get dropped during conversion.
- // We return converted records only when we have at least one
valid batch of messages after conversion.
- if (convertedRecords.records().sizeInBytes() > 0)
- return convertedRecords;
- }
- return allDone();
- }
- }
-}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
deleted file mode 100644
index 1bced605579..00000000000
---
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
-import org.apache.kafka.common.network.TransferableChannel;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-/**
- * Encapsulation for {@link RecordsSend} for {@link
LazyDownConversionRecords}. Records are down-converted in batches and
- * on-demand when {@link #writeTo} method is called.
- */
-public final class LazyDownConversionRecordsSend extends
RecordsSend<LazyDownConversionRecords> {
- private static final Logger log =
LoggerFactory.getLogger(LazyDownConversionRecordsSend.class);
- private static final int MAX_READ_SIZE = 128 * 1024;
- static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD;
-
- private final RecordValidationStats recordValidationStats;
- private final Iterator<ConvertedRecords<?>> convertedRecordsIterator;
-
- private RecordsSend<MemoryRecords> convertedRecordsWriter;
-
- public LazyDownConversionRecordsSend(LazyDownConversionRecords records) {
- super(records, records.sizeInBytes());
- convertedRecordsWriter = null;
- recordValidationStats = new RecordValidationStats();
- convertedRecordsIterator = records().iterator(MAX_READ_SIZE);
- }
-
- private MemoryRecords buildOverflowBatch(int remaining) {
- // We do not have any records left to down-convert. Construct an
overflow message for the length remaining.
- // This message will be ignored by the consumer because its length
will be past the length of maximum
- // possible response size.
- // DefaultRecordBatch =>
- // BaseOffset => Int64
- // Length => Int32
- // ...
- ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
- Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1,
MAX_READ_SIZE)));
- overflowMessageBatch.putLong(-1L);
-
- // Fill in the length of the overflow batch. A valid batch must be at
least as long as the minimum batch
- // overhead.
- overflowMessageBatch.putInt(Math.max(remaining + 1,
DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
- log.debug("Constructed overflow message batch for partition {} with
length={}", topicPartition(), remaining);
- return MemoryRecords.readableRecords(overflowMessageBatch);
- }
-
- @Override
- public int writeTo(TransferableChannel channel, int previouslyWritten, int
remaining) throws IOException {
- if (convertedRecordsWriter == null ||
convertedRecordsWriter.completed()) {
- MemoryRecords convertedRecords;
-
- try {
- // Check if we have more chunks left to down-convert
- if (convertedRecordsIterator.hasNext()) {
- // Get next chunk of down-converted messages
- ConvertedRecords<?> recordsAndStats =
convertedRecordsIterator.next();
- convertedRecords = (MemoryRecords)
recordsAndStats.records();
-
recordValidationStats.add(recordsAndStats.recordConversionStats());
- log.debug("Down-converted records for partition {} with
length={}", topicPartition(), convertedRecords.sizeInBytes());
- } else {
- convertedRecords = buildOverflowBatch(remaining);
- }
- } catch (UnsupportedCompressionTypeException e) {
- // We have encountered a compression type which does not
support down-conversion (e.g. zstd).
- // Since we have already sent at least one batch and we have
committed to the fetch size, we
- // send an overflow batch. The consumer will read the first
few records and then fetch from the
- // offset of the batch which has the unsupported compression
type. At that time, we will
- // send back the UNSUPPORTED_COMPRESSION_TYPE error which will
allow the consumer to fail gracefully.
- convertedRecords = buildOverflowBatch(remaining);
- }
-
- convertedRecordsWriter = new
DefaultRecordsSend<>(convertedRecords, Math.min(convertedRecords.sizeInBytes(),
remaining));
- }
- // safe to cast to int since `remaining` is an int
- return (int) convertedRecordsWriter.writeTo(channel);
- }
-
- public RecordValidationStats recordConversionStats() {
- return recordValidationStats;
- }
-
- public TopicPartition topicPartition() {
- return records().topicPartition();
- }
-}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
index 929b16467c1..ab9c892bac3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransferableChannel;
@@ -25,8 +24,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Queue;
/**
@@ -37,7 +34,6 @@ public class MultiRecordsSend implements Send {
private final Queue<Send> sendQueue;
private final long size;
- private Map<TopicPartition, RecordValidationStats> recordConversionStats;
private long totalWritten = 0;
private Send current;
@@ -94,7 +90,6 @@ public class MultiRecordsSend implements Send {
totalWrittenPerCall += written;
sendComplete = current.completed();
if (sendComplete) {
- updateRecordConversionStats(current);
current = sendQueue.poll();
}
} while (!completed() && sendComplete);
@@ -110,14 +105,6 @@ public class MultiRecordsSend implements Send {
return totalWrittenPerCall;
}
- /**
- * Get any statistics that were recorded as part of executing this {@link
MultiRecordsSend}.
- * @return Records processing statistics (could be null if no statistics
were collected)
- */
- public Map<TopicPartition, RecordValidationStats> recordConversionStats() {
- return recordConversionStats;
- }
-
@Override
public String toString() {
return "MultiRecordsSend(" +
@@ -125,17 +112,4 @@ public class MultiRecordsSend implements Send {
", totalWritten=" + totalWritten +
')';
}
-
- private void updateRecordConversionStats(Send completedSend) {
- // The underlying send might have accumulated statistics that need to
be recorded. For example,
- // LazyDownConversionRecordsSend accumulates statistics related to the
number of bytes down-converted, the amount
- // of temporary memory used for down-conversion, etc. Pull out any
such statistics from the underlying send
- // and fold it up appropriately.
- if (completedSend instanceof LazyDownConversionRecordsSend) {
- if (recordConversionStats == null)
- recordConversionStats = new HashMap<>();
- LazyDownConversionRecordsSend lazyRecordsSend =
(LazyDownConversionRecordsSend) completedSend;
- recordConversionStats.put(lazyRecordsSend.topicPartition(),
lazyRecordsSend.recordConversionStats());
- }
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index a9ab8a7f4e9..fb2a5a3c87b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -202,10 +202,6 @@ public class FetchResponse extends AbstractResponse {
/**
* Returns `partition.records` as `Records` (instead of `BaseRecords`). If
`records` is `null`, returns `MemoryRecords.EMPTY`.
*
- * If this response was deserialized after a fetch, this method should
never fail. An example where this would
- * fail is a down-converted response (e.g. LazyDownConversionRecords) on
the broker (before it's serialized and
- * sent on the wire).
- *
* @param partition partition data
* @return Records or empty record if the records in PartitionData is null.
*/
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
index 58ce62f6c14..619e740029d 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
@@ -123,10 +123,6 @@ public class ShareFetchResponse extends AbstractResponse {
/**
* Returns `partition.records` as `Records` (instead of `BaseRecords`). If
`records` is `null`, returns `MemoryRecords.EMPTY`.
*
- * <p>If this response was deserialized after a share fetch, this method
should never fail. An example where this would
- * fail is a down-converted response (e.g. LazyDownConversionRecords) on
the broker (before it's serialized and
- * sent on the wire).
- *
* @param partition partition data
* @return Records or empty record if the records in PartitionData is null.
*/
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 832d276a02d..4461108713c 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.compress.GzipCompression;
import org.apache.kafka.common.header.Header;
@@ -425,22 +424,6 @@ public class FileRecordsTest {
Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0,
time).records();
assertTrue(batches(messageV0).isEmpty(), "No message should be there");
assertEquals(size - 1, messageV0.sizeInBytes(), "There should be " +
(size - 1) + " bytes");
-
- // Lazy down-conversion will not return any messages for a partial
input batch
- TopicPartition tp = new TopicPartition("topic-1", 0);
- LazyDownConversionRecords lazyRecords = new
LazyDownConversionRecords(tp, slice, RecordBatch.MAGIC_VALUE_V0, 0,
Time.SYSTEM);
- Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(16 * 1024L);
- assertFalse(it.hasNext(), "No messages should be returned");
- }
-
- @Test
- public void testFormatConversionWithNoMessages() {
- TopicPartition tp = new TopicPartition("topic-1", 0);
- LazyDownConversionRecords lazyRecords = new
LazyDownConversionRecords(tp, MemoryRecords.EMPTY, RecordBatch.MAGIC_VALUE_V0,
- 0, Time.SYSTEM);
- assertEquals(0, lazyRecords.sizeInBytes());
- Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(16 * 1024L);
- assertFalse(it.hasNext(), "No messages should be returned");
}
@Test
@@ -637,23 +620,6 @@ public class FileRecordsTest {
convertedRecords.add(fileRecords.downConvert(toMagic, firstOffset,
time).records());
verifyConvertedRecords(initialRecords, initialOffsets,
convertedRecords, compression, toMagic);
convertedRecords.clear();
-
- // Test the lazy down-conversion path
- List<Long> maximumReadSize = asList(16L * 1024L,
- (long) fileRecords.sizeInBytes(),
- (long) fileRecords.sizeInBytes() - 1,
- (long) fileRecords.sizeInBytes() / 4,
- maxBatchSize + 1,
- 1L);
- for (long readSize : maximumReadSize) {
- TopicPartition tp = new TopicPartition("topic-1", 0);
- LazyDownConversionRecords lazyRecords = new
LazyDownConversionRecords(tp, fileRecords, toMagic, firstOffset, Time.SYSTEM);
- Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(readSize);
- while (it.hasNext())
- convertedRecords.add(it.next().records());
- verifyConvertedRecords(initialRecords, initialOffsets,
convertedRecords, compression, toMagic);
- convertedRecords.clear();
- }
}
private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
deleted file mode 100644
index 171ac4b7047..00000000000
---
a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.compress.Compression;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.network.TransferableChannel;
-import org.apache.kafka.common.utils.Time;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static java.util.Arrays.asList;
-import static org.apache.kafka.common.utils.Utils.utf8;
-import static org.apache.kafka.test.TestUtils.tempFile;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class LazyDownConversionRecordsTest {
-
- /**
- * Test the lazy down-conversion path in the presence of commit markers.
When converting to V0 or V1, these batches
- * are dropped. If there happen to be no more batches left to convert, we
must get an overflow message batch after
- * conversion.
- */
- @Test
- public void testConversionOfCommitMarker() throws IOException {
- MemoryRecords recordsToConvert =
MemoryRecords.withEndTransactionMarker(0, Time.SYSTEM.milliseconds(),
RecordBatch.NO_PARTITION_LEADER_EPOCH,
- 1, (short) 1, new
EndTransactionMarker(ControlRecordType.COMMIT, 0));
- MemoryRecords convertedRecords = convertRecords(recordsToConvert,
(byte) 1, recordsToConvert.sizeInBytes());
- ByteBuffer buffer = convertedRecords.buffer();
-
- // read the offset and the batch length
- buffer.getLong();
- int sizeOfConvertedRecords = buffer.getInt();
-
- // assert we got an overflow message batch
- assertTrue(sizeOfConvertedRecords > buffer.limit());
- assertFalse(convertedRecords.batchIterator().hasNext());
- }
-
- private static Collection<Arguments> parameters() {
- List<Arguments> arguments = new ArrayList<>();
- for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <=
RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
- for (boolean overflow : asList(true, false)) {
- arguments.add(Arguments.of(CompressionType.NONE, toMagic,
overflow));
- arguments.add(Arguments.of(CompressionType.GZIP, toMagic,
overflow));
- }
- }
- return arguments;
- }
-
- /**
- * Test the lazy down-conversion path.
- *
- * If `overflow` is true, the number of bytes we want to convert is much
larger
- * than the number of bytes we get after conversion. This causes overflow
message batch(es) to be appended towards the
- * end of the converted output.
- */
- @ParameterizedTest
- @MethodSource("parameters")
- public void testConversion(CompressionType compressionType, byte toMagic,
boolean overflow) throws IOException {
- doTestConversion(compressionType, toMagic, overflow);
- }
-
- private void doTestConversion(CompressionType compressionType, byte
toMagic, boolean testConversionOverflow) throws IOException {
- List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L,
24L);
-
- Header[] headers = {new RecordHeader("headerKey1",
"headerValue1".getBytes()),
- new RecordHeader("headerKey2", "headerValue2".getBytes()),
- new RecordHeader("headerKey3", "headerValue3".getBytes())};
-
- List<SimpleRecord> records = asList(
- new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
- new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
- new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
- new SimpleRecord(4L, "k4".getBytes(), "goodbye for
now".getBytes()),
- new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
- new SimpleRecord(6L, "k6".getBytes(), "I sense
indecision".getBytes()),
- new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
- new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(),
headers),
- new SimpleRecord(9L, "k9".getBytes(), "ok, almost
done".getBytes()),
- new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(),
headers));
- assertEquals(offsets.size(), records.size(), "incorrect test setup");
-
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- Compression compression = Compression.of(compressionType).build();
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, compression,
- TimestampType.CREATE_TIME, 0L);
- for (int i = 0; i < 3; i++)
- builder.appendWithOffset(offsets.get(i), records.get(i));
- builder.close();
-
- builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME,
- 0L);
- for (int i = 3; i < 6; i++)
- builder.appendWithOffset(offsets.get(i), records.get(i));
- builder.close();
-
- builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME,
- 0L);
- for (int i = 6; i < 10; i++)
- builder.appendWithOffset(offsets.get(i), records.get(i));
- builder.close();
- buffer.flip();
-
- MemoryRecords recordsToConvert = MemoryRecords.readableRecords(buffer);
- int numBytesToConvert = recordsToConvert.sizeInBytes();
- if (testConversionOverflow)
- numBytesToConvert *= 2;
-
- MemoryRecords convertedRecords = convertRecords(recordsToConvert,
toMagic, numBytesToConvert);
- verifyDownConvertedRecords(records, offsets, convertedRecords,
compressionType, toMagic);
- }
-
- private static MemoryRecords convertRecords(MemoryRecords
recordsToConvert, byte toMagic, int bytesToConvert) throws IOException {
- try (FileRecords inputRecords = FileRecords.open(tempFile())) {
- inputRecords.append(recordsToConvert);
- inputRecords.flush();
-
- LazyDownConversionRecords lazyRecords = new
LazyDownConversionRecords(new TopicPartition("test", 1),
- inputRecords, toMagic, 0L, Time.SYSTEM);
- LazyDownConversionRecordsSend lazySend = lazyRecords.toSend();
- File outputFile = tempFile();
- ByteBuffer convertedRecordsBuffer;
- try (TransferableChannel channel =
toTransferableChannel(FileChannel.open(outputFile.toPath(),
StandardOpenOption.READ, StandardOpenOption.WRITE))) {
- int written = 0;
- while (written < bytesToConvert) written +=
lazySend.writeTo(channel, written, bytesToConvert - written);
- try (FileRecords convertedRecords =
FileRecords.open(outputFile, true, written, false)) {
- convertedRecordsBuffer =
ByteBuffer.allocate(convertedRecords.sizeInBytes());
- convertedRecords.readInto(convertedRecordsBuffer, 0);
- }
- }
- return MemoryRecords.readableRecords(convertedRecordsBuffer);
- }
- }
-
- private static TransferableChannel toTransferableChannel(FileChannel
channel) {
- return new TransferableChannel() {
-
- @Override
- public boolean hasPendingWrites() {
- return false;
- }
-
- @Override
- public long transferFrom(FileChannel fileChannel, long position,
long count) throws IOException {
- return fileChannel.transferTo(position, count, channel);
- }
-
- @Override
- public boolean isOpen() {
- return channel.isOpen();
- }
-
- @Override
- public void close() throws IOException {
- channel.close();
- }
-
- @Override
- public int write(ByteBuffer src) throws IOException {
- return channel.write(src);
- }
-
- @Override
- public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException {
- return channel.write(srcs, offset, length);
- }
-
- @Override
- public long write(ByteBuffer[] srcs) throws IOException {
- return channel.write(srcs);
- }
- };
- }
-
- private static void verifyDownConvertedRecords(List<SimpleRecord>
initialRecords,
- List<Long> initialOffsets,
- MemoryRecords
downConvertedRecords,
- CompressionType
compressionType,
- byte toMagic) {
- int i = 0;
- for (RecordBatch batch : downConvertedRecords.batches()) {
- assertTrue(batch.magic() <= toMagic, "Magic byte should be lower
than or equal to " + toMagic);
- if (batch.magic() == RecordBatch.MAGIC_VALUE_V0)
- assertEquals(TimestampType.NO_TIMESTAMP_TYPE,
batch.timestampType());
- else
- assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
- assertEquals(compressionType, batch.compressionType(),
"Compression type should not be affected by conversion");
- for (Record record : batch) {
- assertTrue(record.hasMagic(batch.magic()), "Inner record
should have magic " + toMagic);
- assertEquals(initialOffsets.get(i).longValue(),
record.offset(), "Offset should not change");
- assertEquals(utf8(initialRecords.get(i).key()),
utf8(record.key()), "Key should not change");
- assertEquals(utf8(initialRecords.get(i).value()),
utf8(record.value()), "Value should not change");
-
assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
- if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
- assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
-
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
-
assertTrue(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
- } else if (batch.magic() == RecordBatch.MAGIC_VALUE_V1) {
- assertEquals(initialRecords.get(i).timestamp(),
record.timestamp(), "Timestamp should not change");
-
assertTrue(record.hasTimestampType(TimestampType.CREATE_TIME));
-
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
- } else {
- assertEquals(initialRecords.get(i).timestamp(),
record.timestamp(), "Timestamp should not change");
-
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
-
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
- assertArrayEquals(initialRecords.get(i).headers(),
record.headers(), "Headers should not change");
- }
- i += 1;
- }
- }
- assertEquals(initialOffsets.size(), i);
- }
-}