ijuma commented on code in PR #12228: URL: https://github.com/apache/kafka/pull/12228#discussion_r899648184
########## clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java: ########## @@ -162,79 +163,82 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize, BufferSupplier decompressionBufferSupplier) { FilterResult filterResult = new FilterResult(destinationBuffer); - ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer); - for (MutableRecordBatch batch : batches) { - final BatchRetentionResult batchRetentionResult = filter.checkBatchRetention(batch); - final boolean containsMarkerForEmptyTxn = batchRetentionResult.containsMarkerForEmptyTxn; - final BatchRetention batchRetention = batchRetentionResult.batchRetention; - - filterResult.bytesRead += batch.sizeInBytes(); - - if (batchRetention == BatchRetention.DELETE) - continue; - - // We use the absolute offset to decide whether to retain the message or not. Due to KAFKA-4298, we have to - // allow for the possibility that a previous version corrupted the log by writing a compressed record batch - // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we - // recopy the messages to the destination buffer. - byte batchMagic = batch.magic(); - List<Record> retainedRecords = new ArrayList<>(); - - final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter, - batchMagic, true, retainedRecords); - boolean containsTombstones = iterationResult.containsTombstones; - boolean writeOriginalBatch = iterationResult.writeOriginalBatch; - long maxOffset = iterationResult.maxOffset; - - if (!retainedRecords.isEmpty()) { - // we check if the delete horizon should be set to a new value - // in which case, we need to reset the base timestamp and overwrite the timestamp deltas - // if the batch does not contain tombstones, then we don't need to overwrite batch - boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn) - && !batch.deleteHorizonMs().isPresent(); - if (writeOriginalBatch && !needToSetDeleteHorizon) { - batch.writeTo(bufferOutputStream); - filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false); - } else { - final MemoryRecordsBuilder builder; - long deleteHorizonMs; - if (needToSetDeleteHorizon) - deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs; - else - deleteHorizonMs = batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP); - builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs); - - MemoryRecords records = builder.build(); - int filteredBatchSize = records.sizeInBytes(); - if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) - log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " + - "(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " + - "increase their fetch sizes.", - partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize); - - MemoryRecordsBuilder.RecordsInfo info = builder.info(); - filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp, - maxOffset, retainedRecords.size(), filteredBatchSize); + try (ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer)) { Review Comment: This is a no-op, so I assume it's just a hygiene thing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org