ijuma commented on code in PR #14529:
URL: https://github.com/apache/kafka/pull/14529#discussion_r1356886567


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.attribute.FileTime;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import static java.util.Arrays.asList;
+
+/**
+ * A segment of the log. Each segment has two components: a log and an index. 
The log is a FileRecords containing
+ * the actual messages. The index is an OffsetIndex that maps from logical 
offsets to physical file positions. Each
+ * segment has a base offset which is an offset <= the least offset of any 
message in this segment and > any offset in
+ * any previous segment.
+ *
+ * A segment with a base offset of [base_offset] would be stored in two files, 
a [base_offset].index and a [base_offset].log file.
+ *
+ * This class is not thread-safe.
+ */
+public class LogSegment {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LogSegment.class);
+    private static final Timer LOG_FLUSH_TIMER;
+
+    static {
+        KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup(LogSegment.class) {
+            @Override
+            public MetricName metricName(String name, Map<String, String> 
tags) {
+                // Override the group and type names for compatibility
+                return KafkaMetricsGroup.explicitMetricName("kafka.log", 
"LogFlushStats", name, tags);
+            }
+        };
+        LOG_FLUSH_TIMER = 
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+    }
+
+    private final FileRecords log;
+    private final LazyIndex<OffsetIndex> lazyOffsetIndex;
+    private final LazyIndex<TimeIndex> lazyTimeIndex;
+    private final TransactionIndex txnIndex;
+    private final long baseOffset;
+    private final int indexIntervalBytes;
+    private final long rollJitterMs;
+    private final Time time;
+
+    // The timestamp we used for time based log rolling and for ensuring max 
compaction delay
+    // volatile for LogCleaner to see the update
+    private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty();
+
+    /* The maximum timestamp and offset we see so far */
+    private volatile TimestampOffset maxTimestampAndOffsetSoFar = 
TimestampOffset.UNKNOWN;
+
+    private long created;
+
+    /* the number of bytes since we last added an entry in the offset index */
+    private int bytesSinceLastIndexEntry = 0;
+
+    /**
+     * Create a LogSegment with the provided parameters.
+     *
+     * @param log The file records containing log entries
+     * @param lazyOffsetIndex The offset index
+     * @param lazyTimeIndex The timestamp index
+     * @param txnIndex The transaction index
+     * @param baseOffset A lower bound on the offsets in this segment
+     * @param indexIntervalBytes The approximate number of bytes between 
entries in the index
+     * @param rollJitterMs The maximum random jitter subtracted from the 
scheduled segment roll time
+     * @param time The time instance
+     */
+    public LogSegment(FileRecords log,
+                      LazyIndex<OffsetIndex> lazyOffsetIndex,
+                      LazyIndex<TimeIndex> lazyTimeIndex,
+                      TransactionIndex txnIndex,
+                      long baseOffset,
+                      int indexIntervalBytes,
+                      long rollJitterMs,
+                      Time time) {
+        this.log = log;
+        this.lazyOffsetIndex = lazyOffsetIndex;
+        this.lazyTimeIndex = lazyTimeIndex;
+        this.txnIndex = txnIndex;
+        this.baseOffset = baseOffset;
+        this.indexIntervalBytes = indexIntervalBytes;
+        this.rollJitterMs = rollJitterMs;
+        this.time = time;
+        this.created = time.milliseconds();
+    }
+
+    // Visible for testing
+    public LogSegment(LogSegment segment) {
+        this(segment.log, segment.lazyOffsetIndex, segment.lazyTimeIndex, 
segment.txnIndex, segment.baseOffset,
+                segment.indexIntervalBytes, segment.rollJitterMs, 
segment.time);
+    }
+
+    public OffsetIndex offsetIndex() throws IOException {
+        return lazyOffsetIndex.get();
+    }
+
+    public File offsetIndexFile() {
+        return lazyOffsetIndex.file();
+    }
+
+    public TimeIndex timeIndex() throws IOException {
+        return lazyTimeIndex.get();
+    }
+
+    public File timeIndexFile() {
+        return lazyTimeIndex.file();
+    }
+
+    public long baseOffset() {
+        return baseOffset;
+    }
+
+    public FileRecords log() {
+        return log;
+    }
+
+    public long rollJitterMs() {
+        return rollJitterMs;
+    }
+
+    public TransactionIndex txnIndex() {
+        return txnIndex;
+    }
+
+    public boolean shouldRoll(RollParams rollParams) throws IOException {
+        boolean reachedRollMs = timeWaitedForRoll(rollParams.now, 
rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs;
+        int size = size();
+        return size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
+            (size > 0 && reachedRollMs) ||
+            offsetIndex().isFull() || timeIndex().isFull() || 
!canConvertToRelativeOffset(rollParams.maxOffsetInMessages);
+    }
+
+    public void resizeIndexes(int size) throws IOException {
+        offsetIndex().resize(size);
+        timeIndex().resize(size);
+    }
+
+    public void sanityCheck(boolean timeIndexFileNewlyCreated) throws 
IOException {
+        if (offsetIndexFile().exists()) {
+            // Resize the time index file to 0 if it is newly created.
+            if (timeIndexFileNewlyCreated)
+              timeIndex().resize(0);
+            // Sanity checks for time index and offset index are skipped 
because
+            // we will recover the segments above the recovery point in 
recoverLog()
+            // in any case so sanity checking them here is redundant.
+            txnIndex.sanityCheck();
+        } else
+            throw new NoSuchFileException("Offset index file " + 
offsetIndexFile().getAbsolutePath() + " does not exist");
+    }
+
+    public TimestampOffset maxTimestampAndOffsetSoFar() throws IOException {
+        if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN)
+            maxTimestampAndOffsetSoFar = timeIndex().lastEntry();
+        return maxTimestampAndOffsetSoFar;
+    }
+
+    /* The maximum timestamp we see so far */
+    public long maxTimestampSoFar() {
+        return maxTimestampAndOffsetSoFar.timestamp;
+    }
+
+    public long offsetOfMaxTimestampSoFar() {
+        return maxTimestampAndOffsetSoFar.offset;
+    }
+
+    /* Return the size in bytes of this log segment */
+    public int size() {
+        return log.sizeInBytes();
+    }
+
+    /**
+     * checks that the argument offset can be represented as an integer offset 
relative to the baseOffset.
+     */
+    public boolean canConvertToRelativeOffset(long offset) throws IOException {
+        return offsetIndex().canAppendOffset(offset);
+    }
+
+    /**
+     * Append the given messages starting with the given offset. Add
+     * an entry to the index if needed.
+     *
+     * It is assumed this method is being called from within a lock, it is not 
thread-safe otherwise.
+     *
+     * @param largestOffset The last offset in the message set
+     * @param largestTimestampMs The largest timestamp in the message set.
+     * @param shallowOffsetOfMaxTimestamp The offset of the message that has 
the largest timestamp in the messages to append.
+     * @param records The log entries to append.
+     * @throws LogSegmentOffsetOverflowException if the largest offset causes 
index offset overflow
+     */
+    public void append(long largestOffset,
+                       long largestTimestampMs,
+                       long shallowOffsetOfMaxTimestamp,
+                       MemoryRecords records) throws IOException {
+        if (records.sizeInBytes() > 0) {
+            LOGGER.trace("Inserting {} bytes at end offset {} at position {} 
with largest timestamp {} at shallow offset {}",
+                records.sizeInBytes(), largestOffset, log.sizeInBytes(), 
largestTimestampMs, shallowOffsetOfMaxTimestamp);
+            int physicalPosition = log.sizeInBytes();
+            if (physicalPosition == 0)
+                rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);
+
+            ensureOffsetInRange(largestOffset);
+
+            // append the messages
+            long appendedBytes = log.append(records);
+            LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, 
log.file(), largestOffset);
+            // Update the in memory max timestamp and corresponding offset.
+            if (largestTimestampMs > maxTimestampSoFar()) {
+                maxTimestampAndOffsetSoFar = new 
TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
+            }
+            // append an entry to the index (if needed)
+            if (bytesSinceLastIndexEntry > indexIntervalBytes) {
+                offsetIndex().append(largestOffset, physicalPosition);
+                timeIndex().maybeAppend(maxTimestampSoFar(), 
offsetOfMaxTimestampSoFar());
+                bytesSinceLastIndexEntry = 0;
+            }
+            bytesSinceLastIndexEntry += records.sizeInBytes();
+        }
+    }
+
+    private void ensureOffsetInRange(long offset) throws IOException {
+        if (!canConvertToRelativeOffset(offset))
+            throw new LogSegmentOffsetOverflowException(this, offset);
+    }
+
+    private int appendChunkFromFile(FileRecords records, int position, 
BufferSupplier bufferSupplier) throws IOException {
+        int bytesToAppend = 0;
+        long maxTimestamp = Long.MIN_VALUE;
+        long offsetOfMaxTimestamp = Long.MIN_VALUE;
+        long maxOffset = Long.MIN_VALUE;
+        ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024);
+
+        // find all batches that are valid to be appended to the current log 
segment and
+        // determine the maximum offset and timestamp
+        Iterator<FileChannelRecordBatch> nextBatches = 
records.batchesFrom(position).iterator();
+        FileChannelRecordBatch batch;
+        while ((batch = nextAppendableBatch(nextBatches, readBuffer, 
bytesToAppend)) != null) {
+            if (batch.maxTimestamp() > maxTimestamp) {
+                maxTimestamp = batch.maxTimestamp();
+                offsetOfMaxTimestamp = batch.lastOffset();
+            }
+            maxOffset = batch.lastOffset();
+            bytesToAppend += batch.sizeInBytes();
+        }
+
+        if (bytesToAppend > 0) {
+            // Grow buffer if needed to ensure we copy at least one batch
+            if (readBuffer.capacity() < bytesToAppend)
+                readBuffer = bufferSupplier.get(bytesToAppend);
+
+            readBuffer.limit(bytesToAppend);
+            records.readInto(readBuffer, position);
+
+            append(maxOffset, maxTimestamp, offsetOfMaxTimestamp, 
MemoryRecords.readableRecords(readBuffer));
+        }
+
+        bufferSupplier.release(readBuffer);
+        return bytesToAppend;
+    }
+
+    private FileChannelRecordBatch 
nextAppendableBatch(Iterator<FileChannelRecordBatch> recordBatches,
+                                                       ByteBuffer readBuffer,
+                                                       int bytesToAppend) 
throws IOException {
+        if (recordBatches.hasNext()) {
+            FileChannelRecordBatch batch = recordBatches.next();
+            if (canConvertToRelativeOffset(batch.lastOffset()) &&
+                    (bytesToAppend == 0 || bytesToAppend + batch.sizeInBytes() 
< readBuffer.capacity()))
+                return batch;
+        }
+        return null;
+    }
+
+    /**
+     * Append records from a file beginning at the given position until either 
the end of the file
+     * is reached or an offset is found which is too large to convert to a 
relative offset for the indexes.
+     *
+     * @return the number of bytes appended to the log (may be less than the 
size of the input if an
+     *         offset is encountered which would overflow this segment)
+     */
+    public int appendFromFile(FileRecords records, int start) throws 
IOException {
+        int position = start;
+        BufferSupplier bufferSupplier = new 
BufferSupplier.GrowableBufferSupplier();
+        while (position < start + records.sizeInBytes()) {
+            int bytesAppended = appendChunkFromFile(records, position, 
bufferSupplier);
+            if (bytesAppended == 0)
+                return position - start;
+            position += bytesAppended;
+        }
+        return position - start;
+    }
+
+    /* not thread safe */
+    public void updateTxnIndex(CompletedTxn completedTxn, long 
lastStableOffset) throws IOException {

Review Comment:
   No, `UnifiedLog` is in a different package right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to