ijuma commented on code in PR #14529: URL: https://github.com/apache/kafka/pull/14529#discussion_r1356893064
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ########## @@ -0,0 +1,873 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.attribute.FileTime; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.FileRecords.LogOffsetPosition; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import static java.util.Arrays.asList; + +/** + * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing + * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each + * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in + * any previous segment. + * + * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. + * + * This class is not thread-safe. + */ +public class LogSegment { + private static final Logger LOGGER = LoggerFactory.getLogger(LogSegment.class); + private static final Timer LOG_FLUSH_TIMER; + + static { + KafkaMetricsGroup logFlushStatsMetricsGroup = new KafkaMetricsGroup(LogSegment.class) { + @Override + public MetricName metricName(String name, Map<String, String> tags) { + // Override the group and type names for compatibility + return KafkaMetricsGroup.explicitMetricName("kafka.log", "LogFlushStats", name, tags); + } + }; + LOG_FLUSH_TIMER = logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); + } + + private final FileRecords log; + private final LazyIndex<OffsetIndex> lazyOffsetIndex; + private final LazyIndex<TimeIndex> lazyTimeIndex; + private final TransactionIndex txnIndex; + private final long baseOffset; + private final int indexIntervalBytes; + private final long rollJitterMs; + private final Time time; + + // The timestamp we used for time based log rolling and for ensuring max compaction delay + // volatile for LogCleaner to see the update + private volatile OptionalLong rollingBasedTimestamp = OptionalLong.empty(); + + /* The maximum timestamp and offset we see so far */ + private volatile TimestampOffset maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; + + private long created; + + /* the number of bytes since we last added an entry in the offset index */ + private int bytesSinceLastIndexEntry = 0; + + /** + * Create a LogSegment with the provided parameters. + * + * @param log The file records containing log entries + * @param lazyOffsetIndex The offset index + * @param lazyTimeIndex The timestamp index + * @param txnIndex The transaction index + * @param baseOffset A lower bound on the offsets in this segment + * @param indexIntervalBytes The approximate number of bytes between entries in the index + * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time + * @param time The time instance + */ + public LogSegment(FileRecords log, + LazyIndex<OffsetIndex> lazyOffsetIndex, + LazyIndex<TimeIndex> lazyTimeIndex, + TransactionIndex txnIndex, + long baseOffset, + int indexIntervalBytes, + long rollJitterMs, + Time time) { + this.log = log; + this.lazyOffsetIndex = lazyOffsetIndex; + this.lazyTimeIndex = lazyTimeIndex; + this.txnIndex = txnIndex; + this.baseOffset = baseOffset; + this.indexIntervalBytes = indexIntervalBytes; + this.rollJitterMs = rollJitterMs; + this.time = time; + this.created = time.milliseconds(); + } + + // Visible for testing + public LogSegment(LogSegment segment) { + this(segment.log, segment.lazyOffsetIndex, segment.lazyTimeIndex, segment.txnIndex, segment.baseOffset, + segment.indexIntervalBytes, segment.rollJitterMs, segment.time); + } + + public OffsetIndex offsetIndex() throws IOException { + return lazyOffsetIndex.get(); + } + + public File offsetIndexFile() { + return lazyOffsetIndex.file(); + } + + public TimeIndex timeIndex() throws IOException { + return lazyTimeIndex.get(); + } + + public File timeIndexFile() { + return lazyTimeIndex.file(); + } + + public long baseOffset() { + return baseOffset; + } + + public FileRecords log() { + return log; + } + + public long rollJitterMs() { + return rollJitterMs; + } + + public TransactionIndex txnIndex() { + return txnIndex; + } + + public boolean shouldRoll(RollParams rollParams) throws IOException { + boolean reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs; + int size = size(); + return size > rollParams.maxSegmentBytes - rollParams.messagesSize || + (size > 0 && reachedRollMs) || + offsetIndex().isFull() || timeIndex().isFull() || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages); + } + + public void resizeIndexes(int size) throws IOException { + offsetIndex().resize(size); + timeIndex().resize(size); + } + + public void sanityCheck(boolean timeIndexFileNewlyCreated) throws IOException { + if (offsetIndexFile().exists()) { + // Resize the time index file to 0 if it is newly created. + if (timeIndexFileNewlyCreated) + timeIndex().resize(0); + // Sanity checks for time index and offset index are skipped because + // we will recover the segments above the recovery point in recoverLog() + // in any case so sanity checking them here is redundant. + txnIndex.sanityCheck(); + } else + throw new NoSuchFileException("Offset index file " + offsetIndexFile().getAbsolutePath() + " does not exist"); + } + + public TimestampOffset maxTimestampAndOffsetSoFar() throws IOException { + if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) + maxTimestampAndOffsetSoFar = timeIndex().lastEntry(); + return maxTimestampAndOffsetSoFar; + } + + /* The maximum timestamp we see so far */ + public long maxTimestampSoFar() { + return maxTimestampAndOffsetSoFar.timestamp; + } + + public long offsetOfMaxTimestampSoFar() { + return maxTimestampAndOffsetSoFar.offset; + } + + /* Return the size in bytes of this log segment */ + public int size() { + return log.sizeInBytes(); + } + + /** + * checks that the argument offset can be represented as an integer offset relative to the baseOffset. + */ + public boolean canConvertToRelativeOffset(long offset) throws IOException { + return offsetIndex().canAppendOffset(offset); + } + + /** + * Append the given messages starting with the given offset. Add + * an entry to the index if needed. + * + * It is assumed this method is being called from within a lock, it is not thread-safe otherwise. + * + * @param largestOffset The last offset in the message set + * @param largestTimestampMs The largest timestamp in the message set. + * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. + * @param records The log entries to append. + * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow + */ + public void append(long largestOffset, + long largestTimestampMs, + long shallowOffsetOfMaxTimestamp, + MemoryRecords records) throws IOException { + if (records.sizeInBytes() > 0) { + LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at shallow offset {}", + records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp); + int physicalPosition = log.sizeInBytes(); + if (physicalPosition == 0) + rollingBasedTimestamp = OptionalLong.of(largestTimestampMs); + + ensureOffsetInRange(largestOffset); + + // append the messages + long appendedBytes = log.append(records); + LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset); + // Update the in memory max timestamp and corresponding offset. + if (largestTimestampMs > maxTimestampSoFar()) { + maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp); + } + // append an entry to the index (if needed) + if (bytesSinceLastIndexEntry > indexIntervalBytes) { + offsetIndex().append(largestOffset, physicalPosition); + timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar()); + bytesSinceLastIndexEntry = 0; + } + bytesSinceLastIndexEntry += records.sizeInBytes(); + } + } + + private void ensureOffsetInRange(long offset) throws IOException { + if (!canConvertToRelativeOffset(offset)) + throw new LogSegmentOffsetOverflowException(this, offset); + } + + private int appendChunkFromFile(FileRecords records, int position, BufferSupplier bufferSupplier) throws IOException { + int bytesToAppend = 0; + long maxTimestamp = Long.MIN_VALUE; + long offsetOfMaxTimestamp = Long.MIN_VALUE; + long maxOffset = Long.MIN_VALUE; + ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024); + + // find all batches that are valid to be appended to the current log segment and + // determine the maximum offset and timestamp + Iterator<FileChannelRecordBatch> nextBatches = records.batchesFrom(position).iterator(); + FileChannelRecordBatch batch; + while ((batch = nextAppendableBatch(nextBatches, readBuffer, bytesToAppend)) != null) { + if (batch.maxTimestamp() > maxTimestamp) { + maxTimestamp = batch.maxTimestamp(); + offsetOfMaxTimestamp = batch.lastOffset(); + } + maxOffset = batch.lastOffset(); + bytesToAppend += batch.sizeInBytes(); + } + + if (bytesToAppend > 0) { + // Grow buffer if needed to ensure we copy at least one batch + if (readBuffer.capacity() < bytesToAppend) + readBuffer = bufferSupplier.get(bytesToAppend); + + readBuffer.limit(bytesToAppend); + records.readInto(readBuffer, position); + + append(maxOffset, maxTimestamp, offsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer)); + } + + bufferSupplier.release(readBuffer); + return bytesToAppend; + } + + private FileChannelRecordBatch nextAppendableBatch(Iterator<FileChannelRecordBatch> recordBatches, + ByteBuffer readBuffer, + int bytesToAppend) throws IOException { + if (recordBatches.hasNext()) { + FileChannelRecordBatch batch = recordBatches.next(); + if (canConvertToRelativeOffset(batch.lastOffset()) && + (bytesToAppend == 0 || bytesToAppend + batch.sizeInBytes() < readBuffer.capacity())) + return batch; + } + return null; + } + + /** + * Append records from a file beginning at the given position until either the end of the file + * is reached or an offset is found which is too large to convert to a relative offset for the indexes. + * + * @return the number of bytes appended to the log (may be less than the size of the input if an + * offset is encountered which would overflow this segment) + */ + public int appendFromFile(FileRecords records, int start) throws IOException { + int position = start; + BufferSupplier bufferSupplier = new BufferSupplier.GrowableBufferSupplier(); + while (position < start + records.sizeInBytes()) { + int bytesAppended = appendChunkFromFile(records, position, bufferSupplier); + if (bytesAppended == 0) + return position - start; + position += bytesAppended; + } + return position - start; + } + + /* not thread safe */ + public void updateTxnIndex(CompletedTxn completedTxn, long lastStableOffset) throws IOException { + if (completedTxn.isAborted) { + LOGGER.trace("Writing aborted transaction {} to transaction index, last stable offset is {}", completedTxn, lastStableOffset); + txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset)); + } + } + + private void updateProducerState(ProducerStateManager producerStateManager, RecordBatch batch) throws IOException { + if (batch.hasProducerId()) { + long producerId = batch.producerId(); + ProducerAppendInfo appendInfo = producerStateManager.prepareUpdate(producerId, AppendOrigin.REPLICATION); + Optional<CompletedTxn> maybeCompletedTxn = appendInfo.append(batch, Optional.empty()); + producerStateManager.update(appendInfo); + if (maybeCompletedTxn.isPresent()) { + CompletedTxn completedTxn = maybeCompletedTxn.get(); + long lastStableOffset = producerStateManager.lastStableOffset(completedTxn); + updateTxnIndex(completedTxn, lastStableOffset); + producerStateManager.completeTxn(completedTxn); + } + } + producerStateManager.updateMapEndOffset(batch.lastOffset() + 1); + } + + /** + * Equivalent to {@code translateOffset(offset, 0)}. + * + * See {@link #translateOffset(long, int)} for details. + */ + public LogOffsetPosition translateOffset(long offset) throws IOException { + return translateOffset(offset, 0); + } + + /** + * Find the physical file position for the first message with offset >= the requested offset. + * + * The startingFilePosition argument is an optimization that can be used if we already know a valid starting position + * in the file higher than the greatest-lower-bound from the index. + * + * This method is thread-safe. + * + * @param offset The offset we want to translate + * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and + * when omitted, the search will begin at the position in the offset index. + * @return The position in the log storing the message with the least offset >= the requested offset and the size of the + * message or null if no message meets this criteria. + */ + LogOffsetPosition translateOffset(long offset, int startingFilePosition) throws IOException { + OffsetPosition mapping = offsetIndex().lookup(offset); + return log.searchForOffsetWithSize(offset, Math.max(mapping.position, startingFilePosition)); + } + + /** + * Equivalent to {@code read(startOffset, maxSize, size())}. + * + * See {@link #read(long, int, long, boolean)} for details. + */ + public FetchDataInfo read(long startOffset, int maxSize) throws IOException { + return read(startOffset, maxSize, size()); + } + + /** + * Equivalent to {@code read(startOffset, maxSize, maxPosition, false)}. + * + * See {@link #read(long, int, long, boolean)} for details. + */ + public FetchDataInfo read(long startOffset, int maxSize, long maxPosition) throws IOException { + return read(startOffset, maxSize, maxPosition, false); + } + + /** + * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include + * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. + * + * This method is thread-safe. + * + * @param startOffset A lower bound on the first offset to include in the message set we read + * @param maxSize The maximum number of bytes to include in the message set we read + * @param maxPosition The maximum position in the log segment that should be exposed for read + * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) + * + * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset, + * or null if the startOffset is larger than the largest offset in this log + */ + public FetchDataInfo read(long startOffset, int maxSize, long maxPosition, boolean minOneMessage) throws IOException { + if (maxSize < 0) + throw new IllegalArgumentException("Invalid max size " + maxSize + " for log read from segment " + log); + + LogOffsetPosition startOffsetAndSize = translateOffset(startOffset); + + // if the start position is already off the end of the log, return null + if (startOffsetAndSize == null) + return null; + + int startPosition = startOffsetAndSize.position; + LogOffsetMetadata offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition); + + int adjustedMaxSize = maxSize; + if (minOneMessage) + adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size); + + // return a log segment but with zero size in the case below + if (adjustedMaxSize == 0) + return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY); + + // calculate the length of the message set to read based on whether or not they gave us a maxOffset + int fetchSize = Math.min((int) (maxPosition - startPosition), adjustedMaxSize); + + return new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize), + adjustedMaxSize < startOffsetAndSize.size, Optional.empty()); + } + + public OptionalLong fetchUpperBoundOffset(OffsetPosition startOffsetPosition, int fetchSize) throws IOException { + Optional<OffsetPosition> position = offsetIndex().fetchUpperBoundOffset(startOffsetPosition, fetchSize); + if (position.isPresent()) + return OptionalLong.of(position.get().offset); + return OptionalLong.empty(); + } + + /** + * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes + * from the end of the log and index. + * + * This method is not thread-safe. + * + * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover + * the transaction index. + * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery. + * @return The number of bytes truncated from the log + * @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow + */ + public int recover(ProducerStateManager producerStateManager, Optional<LeaderEpochFileCache> leaderEpochCache) throws IOException { + offsetIndex().reset(); + timeIndex().reset(); + txnIndex.reset(); + int validBytes = 0; + int lastIndexEntry = 0; + maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; + try { + for (RecordBatch batch : log.batches()) { + batch.ensureValid(); + ensureOffsetInRange(batch.lastOffset()); + + // The max timestamp is exposed at the batch level, so no need to iterate the records + if (batch.maxTimestamp() > maxTimestampSoFar()) { + maxTimestampAndOffsetSoFar = new TimestampOffset(batch.maxTimestamp(), batch.lastOffset()); + } + + // Build offset index + if (validBytes - lastIndexEntry > indexIntervalBytes) { + offsetIndex().append(batch.lastOffset(), validBytes); + timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar()); + lastIndexEntry = validBytes; + } + validBytes += batch.sizeInBytes(); + + if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) { + leaderEpochCache.ifPresent(cache -> { + if (batch.partitionLeaderEpoch() >= 0 && + (!cache.latestEpoch().isPresent() || batch.partitionLeaderEpoch() > cache.latestEpoch().getAsInt())) + cache.assign(batch.partitionLeaderEpoch(), batch.baseOffset()); + }); + updateProducerState(producerStateManager, batch); + } + } + } catch (CorruptRecordException | InvalidRecordException e) { + LOGGER.warn("Found invalid messages in log segment {} at byte offset {}: {}. {}", log.file().getAbsolutePath(), + validBytes, e.getMessage(), e.getCause()); + } + int truncated = log.sizeInBytes() - validBytes; + if (truncated > 0) + LOGGER.debug("Truncated {} invalid bytes at the end of segment {} during recovery", truncated, log.file().getAbsolutePath()); + + log.truncateTo(validBytes); + offsetIndex().trimToValidSize(); + // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well. + timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar(), true); + timeIndex().trimToValidSize(); + return truncated; + } + + /** + * Check whether the last offset of the last batch in this segment overflows the indexes. + */ + public boolean hasOverflow() throws IOException { + long nextOffset = readNextOffset(); + return nextOffset > baseOffset && !canConvertToRelativeOffset(nextOffset - 1); + } + + public TxnIndexSearchResult collectAbortedTxns(long fetchOffset, long upperBoundOffset) { + return txnIndex.collectAbortedTxns(fetchOffset, upperBoundOffset); + } + + @Override + public String toString() { + return "LogSegment(baseOffset=" + baseOffset + + ", size=" + size() + + ", lastModifiedTime=" + lastModified() + + ", largestRecordTimestamp=" + largestRecordTimestamp() + + ")"; + } + + /** + * Truncate off all index and log entries with offsets >= the given offset. + * If the given offset is larger than the largest message in this segment, do nothing. + * + * This method is not thread-safe. + * + * @param offset The offset to truncate to + * @return The number of log bytes truncated + */ + public int truncateTo(long offset) throws IOException { + // Do offset translation before truncating the index to avoid needless scanning + // in case we truncate the full index + LogOffsetPosition mapping = translateOffset(offset); + OffsetIndex offsetIndex = offsetIndex(); + TimeIndex timeIndex = timeIndex(); + + offsetIndex.truncateTo(offset); + timeIndex.truncateTo(offset); + txnIndex.truncateTo(offset); + + // After truncation, reset and allocate more space for the (new currently active) index + offsetIndex.resize(offsetIndex.maxIndexSize()); + timeIndex.resize(timeIndex.maxIndexSize()); + + int bytesTruncated; + if (mapping == null) + bytesTruncated = 0; + else + bytesTruncated = log.truncateTo(mapping.position); + + if (log.sizeInBytes() == 0) { + created = time.milliseconds(); + rollingBasedTimestamp = OptionalLong.empty(); + } + + bytesSinceLastIndexEntry = 0; + if (maxTimestampSoFar() >= 0) + maxTimestampAndOffsetSoFar = readLargestTimestamp(); + + return bytesTruncated; + } + + private TimestampOffset readLargestTimestamp() throws IOException { + // Get the last time index entry. If the time index is empty, it will return (-1, baseOffset) + TimestampOffset lastTimeIndexEntry = timeIndex().lastEntry(); + OffsetPosition offsetPosition = offsetIndex().lookup(lastTimeIndexEntry.offset); + + // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry. + FileRecords.TimestampAndOffset maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position); + if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) + return new TimestampOffset(maxTimestampOffsetAfterLastEntry.timestamp, maxTimestampOffsetAfterLastEntry.offset); + + return lastTimeIndexEntry; + } + + /** + * Calculate the offset that would be used for the next message to be append to this segment. + * Note that this is expensive. + * + * This method is thread-safe. + */ + public long readNextOffset() throws IOException { + FetchDataInfo fetchData = read(offsetIndex().lastOffset(), log.sizeInBytes()); + if (fetchData == null) + return baseOffset; + else + return fetchData.records.lastBatch() + .map(batch -> batch.nextOffset()) + .orElse(baseOffset); + } + + /** + * Flush this log segment to disk. + * + * This method is thread-safe. + */ + public void flush() throws IOException { + try { + LOG_FLUSH_TIMER.time(new Callable<Void>() { + // lambdas cannot declare a more specific exception type, so we use an anonymous inner class + @Override + public Void call() throws IOException { + log.flush(); + offsetIndex().flush(); + timeIndex().flush(); + txnIndex.flush(); + return null; + } + }); + } catch (Exception e) { + if (e instanceof IOException) + throw (IOException) e; + else if (e instanceof RuntimeException) + throw (RuntimeException) e; + else + throw new IllegalStateException("Unexpected exception thrown: " + e, e); Review Comment: Not really, this is a change to make the compiler happy, but we never throw a checked exception that is not an IOException. If the code ever changes to do that, we'll catch it here. I think it's fine as it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org