This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new c469e8b159d KAFKA-18445 Remove LazyDownConversionRecords and 
LazyDownConversionRecordsSend (#18445)
c469e8b159d is described below

commit c469e8b159dca96a8df531380ecf05f083ab34d4
Author: xijiu <[email protected]>
AuthorDate: Fri Jan 10 00:22:56 2025 +0800

    KAFKA-18445 Remove LazyDownConversionRecords and 
LazyDownConversionRecordsSend (#18445)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../common/record/LazyDownConversionRecords.java   | 190 ----------------
 .../record/LazyDownConversionRecordsSend.java      | 108 ---------
 .../kafka/common/record/MultiRecordsSend.java      |  26 ---
 .../kafka/common/requests/FetchResponse.java       |   4 -
 .../kafka/common/requests/ShareFetchResponse.java  |   4 -
 .../kafka/common/record/FileRecordsTest.java       |  34 ---
 .../record/LazyDownConversionRecordsTest.java      | 245 ---------------------
 7 files changed, 611 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
deleted file mode 100644
index 50a8f27f42c..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.Time;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Encapsulation for holding records that require down-conversion in a lazy, 
chunked manner (KIP-283). See
- * {@link LazyDownConversionRecordsSend} for the actual chunked send 
implementation.
- */
-public class LazyDownConversionRecords implements BaseRecords {
-    private final TopicPartition topicPartition;
-    private final Records records;
-    private final byte toMagic;
-    private final long firstOffset;
-    private ConvertedRecords<?> firstConvertedBatch;
-    private final int sizeInBytes;
-    private final Time time;
-
-    /**
-     * @param topicPartition The topic-partition to which records belong
-     * @param records Records to lazily down-convert
-     * @param toMagic Magic version to down-convert to
-     * @param firstOffset The starting offset for down-converted records. This 
only impacts some cases. See
-     *                    {@link RecordsUtil#downConvert(Iterable, byte, long, 
Time)} for an explanation.
-     * @param time The time instance to use
-     *
-     * @throws 
org.apache.kafka.common.errors.UnsupportedCompressionTypeException If the first 
batch to down-convert
-     *    has a compression type which we do not support down-conversion for.
-     */
-    public LazyDownConversionRecords(TopicPartition topicPartition, Records 
records, byte toMagic, long firstOffset, Time time) {
-        this.topicPartition = Objects.requireNonNull(topicPartition);
-        this.records = Objects.requireNonNull(records);
-        this.toMagic = toMagic;
-        this.firstOffset = firstOffset;
-        this.time = Objects.requireNonNull(time);
-
-        // To make progress, kafka consumers require at least one full record 
batch per partition, i.e. we need to
-        // ensure we can accommodate one full batch of down-converted 
messages. We achieve this by having `sizeInBytes`
-        // factor in the size of the first down-converted batch and we return 
at least that many bytes.
-        java.util.Iterator<ConvertedRecords<?>> it = iterator(0);
-        if (it.hasNext()) {
-            firstConvertedBatch = it.next();
-            sizeInBytes = Math.max(records.sizeInBytes(), 
firstConvertedBatch.records().sizeInBytes());
-        } else {
-            // If there are messages before down-conversion and no messages 
after down-conversion,
-            // make sure we are able to send at least an overflow message to 
the consumer so that it can throw
-            // a RecordTooLargeException. Typically, the consumer would need 
to increase the fetch size in such cases.
-            // If there are no messages before down-conversion, we return an 
empty record batch.
-            firstConvertedBatch = null;
-            sizeInBytes = records.batches().iterator().hasNext() ? 
LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH : 0;
-        }
-    }
-
-    @Override
-    public int sizeInBytes() {
-        return sizeInBytes;
-    }
-
-    @Override
-    public LazyDownConversionRecordsSend toSend() {
-        return new LazyDownConversionRecordsSend(this);
-    }
-
-    public TopicPartition topicPartition() {
-        return topicPartition;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o instanceof LazyDownConversionRecords) {
-            LazyDownConversionRecords that = (LazyDownConversionRecords) o;
-            return toMagic == that.toMagic &&
-                    firstOffset == that.firstOffset &&
-                    topicPartition.equals(that.topicPartition) &&
-                    records.equals(that.records);
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = toMagic;
-        result = 31 * result + Long.hashCode(firstOffset);
-        result = 31 * result + topicPartition.hashCode();
-        result = 31 * result + records.hashCode();
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "LazyDownConversionRecords(size=" + sizeInBytes +
-                ", underlying=" + records +
-                ", toMagic=" + toMagic +
-                ", firstOffset=" + firstOffset +
-                ")";
-    }
-
-    public final java.util.Iterator<ConvertedRecords<?>> iterator(long 
maximumReadSize) {
-        // We typically expect only one iterator instance to be created, so 
null out the first converted batch after
-        // first use to make it available for GC.
-        ConvertedRecords<?> firstBatch = firstConvertedBatch;
-        firstConvertedBatch = null;
-        return new Iterator(records, maximumReadSize, firstBatch);
-    }
-
-    /**
-     * Implementation for being able to iterate over down-converted records. 
Goal of this implementation is to keep
-     * it as memory-efficient as possible by not having to maintain all 
down-converted records in-memory. Maintains
-     * a view into batches of down-converted records.
-     */
-    private class Iterator extends AbstractIterator<ConvertedRecords<?>> {
-        private final AbstractIterator<? extends RecordBatch> batchIterator;
-        private final long maximumReadSize;
-        private ConvertedRecords<?> firstConvertedBatch;
-
-        /**
-         * @param recordsToDownConvert Records that require down-conversion
-         * @param maximumReadSize Maximum possible size of underlying records 
that will be down-converted in each call to
-         *                        {@link #makeNext()}. This is a soft limit as 
{@link #makeNext()} will always convert
-         *                        and return at least one full message batch.
-         */
-        private Iterator(Records recordsToDownConvert, long maximumReadSize, 
ConvertedRecords<?> firstConvertedBatch) {
-            this.batchIterator = recordsToDownConvert.batchIterator();
-            this.maximumReadSize = maximumReadSize;
-            this.firstConvertedBatch = firstConvertedBatch;
-            // If we already have the first down-converted batch, advance the 
underlying records iterator to next batch
-            if (firstConvertedBatch != null)
-                this.batchIterator.next();
-        }
-
-        /**
-         * Make next set of down-converted records
-         * @return Down-converted records
-         */
-        @Override
-        protected ConvertedRecords<?> makeNext() {
-            // If we have cached the first down-converted batch, return that 
now
-            if (firstConvertedBatch != null) {
-                ConvertedRecords<?> convertedBatch = firstConvertedBatch;
-                firstConvertedBatch = null;
-                return convertedBatch;
-            }
-
-            while (batchIterator.hasNext()) {
-                final List<RecordBatch> batches = new ArrayList<>();
-                boolean isFirstBatch = true;
-                long sizeSoFar = 0;
-
-                // Figure out batches we should down-convert based on the size 
constraints
-                while (batchIterator.hasNext() &&
-                        (isFirstBatch || (batchIterator.peek().sizeInBytes() + 
sizeSoFar) <= maximumReadSize)) {
-                    RecordBatch currentBatch = batchIterator.next();
-                    batches.add(currentBatch);
-                    sizeSoFar += currentBatch.sizeInBytes();
-                    isFirstBatch = false;
-                }
-
-                ConvertedRecords<MemoryRecords> convertedRecords = 
RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
-                // During conversion, it is possible that we drop certain 
batches because they do not have an equivalent
-                // representation in the message format we want to convert to. 
For example, V0 and V1 message formats
-                // have no notion of transaction markers which were introduced 
in V2 so they get dropped during conversion.
-                // We return converted records only when we have at least one 
valid batch of messages after conversion.
-                if (convertedRecords.records().sizeInBytes() > 0)
-                    return convertedRecords;
-            }
-            return allDone();
-        }
-    }
-}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
deleted file mode 100644
index 1bced605579..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
-import org.apache.kafka.common.network.TransferableChannel;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-/**
- * Encapsulation for {@link RecordsSend} for {@link 
LazyDownConversionRecords}. Records are down-converted in batches and
- * on-demand when {@link #writeTo} method is called.
- */
-public final class LazyDownConversionRecordsSend extends 
RecordsSend<LazyDownConversionRecords> {
-    private static final Logger log = 
LoggerFactory.getLogger(LazyDownConversionRecordsSend.class);
-    private static final int MAX_READ_SIZE = 128 * 1024;
-    static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD;
-
-    private final RecordValidationStats recordValidationStats;
-    private final Iterator<ConvertedRecords<?>> convertedRecordsIterator;
-
-    private RecordsSend<MemoryRecords> convertedRecordsWriter;
-
-    public LazyDownConversionRecordsSend(LazyDownConversionRecords records) {
-        super(records, records.sizeInBytes());
-        convertedRecordsWriter = null;
-        recordValidationStats = new RecordValidationStats();
-        convertedRecordsIterator = records().iterator(MAX_READ_SIZE);
-    }
-
-    private MemoryRecords buildOverflowBatch(int remaining) {
-        // We do not have any records left to down-convert. Construct an 
overflow message for the length remaining.
-        // This message will be ignored by the consumer because its length 
will be past the length of maximum
-        // possible response size.
-        // DefaultRecordBatch =>
-        //      BaseOffset => Int64
-        //      Length => Int32
-        //      ...
-        ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
-                Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, 
MAX_READ_SIZE)));
-        overflowMessageBatch.putLong(-1L);
-
-        // Fill in the length of the overflow batch. A valid batch must be at 
least as long as the minimum batch
-        // overhead.
-        overflowMessageBatch.putInt(Math.max(remaining + 1, 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
-        log.debug("Constructed overflow message batch for partition {} with 
length={}", topicPartition(), remaining);
-        return MemoryRecords.readableRecords(overflowMessageBatch);
-    }
-
-    @Override
-    public int writeTo(TransferableChannel channel, int previouslyWritten, int 
remaining) throws IOException {
-        if (convertedRecordsWriter == null || 
convertedRecordsWriter.completed()) {
-            MemoryRecords convertedRecords;
-
-            try {
-                // Check if we have more chunks left to down-convert
-                if (convertedRecordsIterator.hasNext()) {
-                    // Get next chunk of down-converted messages
-                    ConvertedRecords<?> recordsAndStats = 
convertedRecordsIterator.next();
-                    convertedRecords = (MemoryRecords) 
recordsAndStats.records();
-                    
recordValidationStats.add(recordsAndStats.recordConversionStats());
-                    log.debug("Down-converted records for partition {} with 
length={}", topicPartition(), convertedRecords.sizeInBytes());
-                } else {
-                    convertedRecords = buildOverflowBatch(remaining);
-                }
-            } catch (UnsupportedCompressionTypeException e) {
-                // We have encountered a compression type which does not 
support down-conversion (e.g. zstd).
-                // Since we have already sent at least one batch and we have 
committed to the fetch size, we
-                // send an overflow batch. The consumer will read the first 
few records and then fetch from the
-                // offset of the batch which has the unsupported compression 
type. At that time, we will
-                // send back the UNSUPPORTED_COMPRESSION_TYPE error which will 
allow the consumer to fail gracefully.
-                convertedRecords = buildOverflowBatch(remaining);
-            }
-
-            convertedRecordsWriter = new 
DefaultRecordsSend<>(convertedRecords, Math.min(convertedRecords.sizeInBytes(), 
remaining));
-        }
-        // safe to cast to int since `remaining` is an int
-        return (int) convertedRecordsWriter.writeTo(channel);
-    }
-
-    public RecordValidationStats recordConversionStats() {
-        return recordValidationStats;
-    }
-
-    public TopicPartition topicPartition() {
-        return records().topicPartition();
-    }
-}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java 
b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
index 929b16467c1..ab9c892bac3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.network.TransferableChannel;
 
@@ -25,8 +24,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Queue;
 
 /**
@@ -37,7 +34,6 @@ public class MultiRecordsSend implements Send {
 
     private final Queue<Send> sendQueue;
     private final long size;
-    private Map<TopicPartition, RecordValidationStats> recordConversionStats;
 
     private long totalWritten = 0;
     private Send current;
@@ -94,7 +90,6 @@ public class MultiRecordsSend implements Send {
             totalWrittenPerCall += written;
             sendComplete = current.completed();
             if (sendComplete) {
-                updateRecordConversionStats(current);
                 current = sendQueue.poll();
             }
         } while (!completed() && sendComplete);
@@ -110,14 +105,6 @@ public class MultiRecordsSend implements Send {
         return totalWrittenPerCall;
     }
 
-    /**
-     * Get any statistics that were recorded as part of executing this {@link 
MultiRecordsSend}.
-     * @return Records processing statistics (could be null if no statistics 
were collected)
-     */
-    public Map<TopicPartition, RecordValidationStats> recordConversionStats() {
-        return recordConversionStats;
-    }
-
     @Override
     public String toString() {
         return "MultiRecordsSend(" +
@@ -125,17 +112,4 @@ public class MultiRecordsSend implements Send {
             ", totalWritten=" + totalWritten +
             ')';
     }
-
-    private void updateRecordConversionStats(Send completedSend) {
-        // The underlying send might have accumulated statistics that need to 
be recorded. For example,
-        // LazyDownConversionRecordsSend accumulates statistics related to the 
number of bytes down-converted, the amount
-        // of temporary memory used for down-conversion, etc. Pull out any 
such statistics from the underlying send
-        // and fold it up appropriately.
-        if (completedSend instanceof LazyDownConversionRecordsSend) {
-            if (recordConversionStats == null)
-                recordConversionStats = new HashMap<>();
-            LazyDownConversionRecordsSend lazyRecordsSend = 
(LazyDownConversionRecordsSend) completedSend;
-            recordConversionStats.put(lazyRecordsSend.topicPartition(), 
lazyRecordsSend.recordConversionStats());
-        }
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index a9ab8a7f4e9..fb2a5a3c87b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -202,10 +202,6 @@ public class FetchResponse extends AbstractResponse {
     /**
      * Returns `partition.records` as `Records` (instead of `BaseRecords`). If 
`records` is `null`, returns `MemoryRecords.EMPTY`.
      *
-     * If this response was deserialized after a fetch, this method should 
never fail. An example where this would
-     * fail is a down-converted response (e.g. LazyDownConversionRecords) on 
the broker (before it's serialized and
-     * sent on the wire).
-     *
      * @param partition partition data
      * @return Records or empty record if the records in PartitionData is null.
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
index 58ce62f6c14..619e740029d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
@@ -123,10 +123,6 @@ public class ShareFetchResponse extends AbstractResponse {
     /**
      * Returns `partition.records` as `Records` (instead of `BaseRecords`). If 
`records` is `null`, returns `MemoryRecords.EMPTY`.
      *
-     * <p>If this response was deserialized after a share fetch, this method 
should never fail. An example where this would
-     * fail is a down-converted response (e.g. LazyDownConversionRecords) on 
the broker (before it's serialized and
-     * sent on the wire).
-     *
      * @param partition partition data
      * @return Records or empty record if the records in PartitionData is null.
      */
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 832d276a02d..4461108713c 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.compress.GzipCompression;
 import org.apache.kafka.common.header.Header;
@@ -425,22 +424,6 @@ public class FileRecordsTest {
         Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0, 
time).records();
         assertTrue(batches(messageV0).isEmpty(), "No message should be there");
         assertEquals(size - 1, messageV0.sizeInBytes(), "There should be " + 
(size - 1) + " bytes");
-
-        // Lazy down-conversion will not return any messages for a partial 
input batch
-        TopicPartition tp = new TopicPartition("topic-1", 0);
-        LazyDownConversionRecords lazyRecords = new 
LazyDownConversionRecords(tp, slice, RecordBatch.MAGIC_VALUE_V0, 0, 
Time.SYSTEM);
-        Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(16 * 1024L);
-        assertFalse(it.hasNext(), "No messages should be returned");
-    }
-
-    @Test
-    public void testFormatConversionWithNoMessages() {
-        TopicPartition tp = new TopicPartition("topic-1", 0);
-        LazyDownConversionRecords lazyRecords = new 
LazyDownConversionRecords(tp, MemoryRecords.EMPTY, RecordBatch.MAGIC_VALUE_V0,
-            0, Time.SYSTEM);
-        assertEquals(0, lazyRecords.sizeInBytes());
-        Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(16 * 1024L);
-        assertFalse(it.hasNext(), "No messages should be returned");
     }
 
     @Test
@@ -637,23 +620,6 @@ public class FileRecordsTest {
         convertedRecords.add(fileRecords.downConvert(toMagic, firstOffset, 
time).records());
         verifyConvertedRecords(initialRecords, initialOffsets, 
convertedRecords, compression, toMagic);
         convertedRecords.clear();
-
-        // Test the lazy down-conversion path
-        List<Long> maximumReadSize = asList(16L * 1024L,
-                (long) fileRecords.sizeInBytes(),
-                (long) fileRecords.sizeInBytes() - 1,
-                (long) fileRecords.sizeInBytes() / 4,
-                maxBatchSize + 1,
-                1L);
-        for (long readSize : maximumReadSize) {
-            TopicPartition tp = new TopicPartition("topic-1", 0);
-            LazyDownConversionRecords lazyRecords = new 
LazyDownConversionRecords(tp, fileRecords, toMagic, firstOffset, Time.SYSTEM);
-            Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(readSize);
-            while (it.hasNext())
-                convertedRecords.add(it.next().records());
-            verifyConvertedRecords(initialRecords, initialOffsets, 
convertedRecords, compression, toMagic);
-            convertedRecords.clear();
-        }
     }
 
     private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
deleted file mode 100644
index 171ac4b7047..00000000000
--- 
a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.compress.Compression;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.network.TransferableChannel;
-import org.apache.kafka.common.utils.Time;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static java.util.Arrays.asList;
-import static org.apache.kafka.common.utils.Utils.utf8;
-import static org.apache.kafka.test.TestUtils.tempFile;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class LazyDownConversionRecordsTest {
-
-    /**
-     * Test the lazy down-conversion path in the presence of commit markers. 
When converting to V0 or V1, these batches
-     * are dropped. If there happen to be no more batches left to convert, we 
must get an overflow message batch after
-     * conversion.
-     */
-    @Test
-    public void testConversionOfCommitMarker() throws IOException {
-        MemoryRecords recordsToConvert = 
MemoryRecords.withEndTransactionMarker(0, Time.SYSTEM.milliseconds(), 
RecordBatch.NO_PARTITION_LEADER_EPOCH,
-                1, (short) 1, new 
EndTransactionMarker(ControlRecordType.COMMIT, 0));
-        MemoryRecords convertedRecords = convertRecords(recordsToConvert, 
(byte) 1, recordsToConvert.sizeInBytes());
-        ByteBuffer buffer = convertedRecords.buffer();
-
-        // read the offset and the batch length
-        buffer.getLong();
-        int sizeOfConvertedRecords = buffer.getInt();
-
-        // assert we got an overflow message batch
-        assertTrue(sizeOfConvertedRecords > buffer.limit());
-        assertFalse(convertedRecords.batchIterator().hasNext());
-    }
-
-    private static Collection<Arguments> parameters() {
-        List<Arguments> arguments = new ArrayList<>();
-        for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= 
RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
-            for (boolean overflow : asList(true, false)) {
-                arguments.add(Arguments.of(CompressionType.NONE, toMagic, 
overflow));
-                arguments.add(Arguments.of(CompressionType.GZIP, toMagic, 
overflow));
-            }
-        }
-        return arguments;
-    }
-
-    /**
-     * Test the lazy down-conversion path.
-     *
-     * If `overflow` is true, the number of bytes we want to convert is much 
larger
-     * than the number of bytes we get after conversion. This causes overflow 
message batch(es) to be appended towards the
-     * end of the converted output.
-     */
-    @ParameterizedTest
-    @MethodSource("parameters")
-    public void testConversion(CompressionType compressionType, byte toMagic, 
boolean overflow) throws IOException {
-        doTestConversion(compressionType, toMagic, overflow);
-    }
-
-    private void doTestConversion(CompressionType compressionType, byte 
toMagic, boolean testConversionOverflow) throws IOException {
-        List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 
24L);
-
-        Header[] headers = {new RecordHeader("headerKey1", 
"headerValue1".getBytes()),
-            new RecordHeader("headerKey2", "headerValue2".getBytes()),
-            new RecordHeader("headerKey3", "headerValue3".getBytes())};
-
-        List<SimpleRecord> records = asList(
-            new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
-            new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
-            new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
-            new SimpleRecord(4L, "k4".getBytes(), "goodbye for 
now".getBytes()),
-            new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
-            new SimpleRecord(6L, "k6".getBytes(), "I sense 
indecision".getBytes()),
-            new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
-            new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), 
headers),
-            new SimpleRecord(9L, "k9".getBytes(), "ok, almost 
done".getBytes()),
-            new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), 
headers));
-        assertEquals(offsets.size(), records.size(), "incorrect test setup");
-
-        ByteBuffer buffer = ByteBuffer.allocate(1024);
-        Compression compression = Compression.of(compressionType).build();
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, compression,
-                TimestampType.CREATE_TIME, 0L);
-        for (int i = 0; i < 3; i++)
-            builder.appendWithOffset(offsets.get(i), records.get(i));
-        builder.close();
-
-        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME,
-                0L);
-        for (int i = 3; i < 6; i++)
-            builder.appendWithOffset(offsets.get(i), records.get(i));
-        builder.close();
-
-        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME,
-                0L);
-        for (int i = 6; i < 10; i++)
-            builder.appendWithOffset(offsets.get(i), records.get(i));
-        builder.close();
-        buffer.flip();
-
-        MemoryRecords recordsToConvert = MemoryRecords.readableRecords(buffer);
-        int numBytesToConvert = recordsToConvert.sizeInBytes();
-        if (testConversionOverflow)
-            numBytesToConvert *= 2;
-
-        MemoryRecords convertedRecords = convertRecords(recordsToConvert, 
toMagic, numBytesToConvert);
-        verifyDownConvertedRecords(records, offsets, convertedRecords, 
compressionType, toMagic);
-    }
-
-    private static MemoryRecords convertRecords(MemoryRecords 
recordsToConvert, byte toMagic, int bytesToConvert) throws IOException {
-        try (FileRecords inputRecords = FileRecords.open(tempFile())) {
-            inputRecords.append(recordsToConvert);
-            inputRecords.flush();
-
-            LazyDownConversionRecords lazyRecords = new 
LazyDownConversionRecords(new TopicPartition("test", 1),
-                    inputRecords, toMagic, 0L, Time.SYSTEM);
-            LazyDownConversionRecordsSend lazySend = lazyRecords.toSend();
-            File outputFile = tempFile();
-            ByteBuffer convertedRecordsBuffer;
-            try (TransferableChannel channel = 
toTransferableChannel(FileChannel.open(outputFile.toPath(), 
StandardOpenOption.READ, StandardOpenOption.WRITE))) {
-                int written = 0;
-                while (written < bytesToConvert) written += 
lazySend.writeTo(channel, written, bytesToConvert - written);
-                try (FileRecords convertedRecords = 
FileRecords.open(outputFile, true, written, false)) {
-                    convertedRecordsBuffer = 
ByteBuffer.allocate(convertedRecords.sizeInBytes());
-                    convertedRecords.readInto(convertedRecordsBuffer, 0);
-                }
-            }
-            return MemoryRecords.readableRecords(convertedRecordsBuffer);
-        }
-    }
-
-    private static TransferableChannel toTransferableChannel(FileChannel 
channel) {
-        return new TransferableChannel() {
-
-            @Override
-            public boolean hasPendingWrites() {
-                return false;
-            }
-
-            @Override
-            public long transferFrom(FileChannel fileChannel, long position, 
long count) throws IOException {
-                return fileChannel.transferTo(position, count, channel);
-            }
-
-            @Override
-            public boolean isOpen() {
-                return channel.isOpen();
-            }
-
-            @Override
-            public void close() throws IOException {
-                channel.close();
-            }
-
-            @Override
-            public int write(ByteBuffer src) throws IOException {
-                return channel.write(src);
-            }
-
-            @Override
-            public long write(ByteBuffer[] srcs, int offset, int length) 
throws IOException {
-                return channel.write(srcs, offset, length);
-            }
-
-            @Override
-            public long write(ByteBuffer[] srcs) throws IOException {
-                return channel.write(srcs);
-            }
-        };
-    }
-
-    private static void verifyDownConvertedRecords(List<SimpleRecord> 
initialRecords,
-                                                   List<Long> initialOffsets,
-                                                   MemoryRecords 
downConvertedRecords,
-                                                   CompressionType 
compressionType,
-                                                   byte toMagic) {
-        int i = 0;
-        for (RecordBatch batch : downConvertedRecords.batches()) {
-            assertTrue(batch.magic() <= toMagic, "Magic byte should be lower 
than or equal to " + toMagic);
-            if (batch.magic() == RecordBatch.MAGIC_VALUE_V0)
-                assertEquals(TimestampType.NO_TIMESTAMP_TYPE, 
batch.timestampType());
-            else
-                assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
-            assertEquals(compressionType, batch.compressionType(), 
"Compression type should not be affected by conversion");
-            for (Record record : batch) {
-                assertTrue(record.hasMagic(batch.magic()), "Inner record 
should have magic " + toMagic);
-                assertEquals(initialOffsets.get(i).longValue(), 
record.offset(), "Offset should not change");
-                assertEquals(utf8(initialRecords.get(i).key()), 
utf8(record.key()), "Key should not change");
-                assertEquals(utf8(initialRecords.get(i).value()), 
utf8(record.value()), "Value should not change");
-                
assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
-                if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
-                    assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
-                    
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
-                    
assertTrue(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
-                } else if (batch.magic() == RecordBatch.MAGIC_VALUE_V1) {
-                    assertEquals(initialRecords.get(i).timestamp(), 
record.timestamp(), "Timestamp should not change");
-                    
assertTrue(record.hasTimestampType(TimestampType.CREATE_TIME));
-                    
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
-                } else {
-                    assertEquals(initialRecords.get(i).timestamp(), 
record.timestamp(), "Timestamp should not change");
-                    
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
-                    
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
-                    assertArrayEquals(initialRecords.get(i).headers(), 
record.headers(), "Headers should not change");
-                }
-                i += 1;
-            }
-        }
-        assertEquals(initialOffsets.size(), i);
-    }
-}


Reply via email to