divijvaidya commented on code in PR #12228:
URL: https://github.com/apache/kafka/pull/12228#discussion_r900281799


##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##########
@@ -162,79 +163,82 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable<MutableR
                                          RecordFilter filter, ByteBuffer 
destinationBuffer, int maxRecordBatchSize,
                                          BufferSupplier 
decompressionBufferSupplier) {
         FilterResult filterResult = new FilterResult(destinationBuffer);
-        ByteBufferOutputStream bufferOutputStream = new 
ByteBufferOutputStream(destinationBuffer);
-        for (MutableRecordBatch batch : batches) {
-            final BatchRetentionResult batchRetentionResult = 
filter.checkBatchRetention(batch);
-            final boolean containsMarkerForEmptyTxn = 
batchRetentionResult.containsMarkerForEmptyTxn;
-            final BatchRetention batchRetention = 
batchRetentionResult.batchRetention;
-
-            filterResult.bytesRead += batch.sizeInBytes();
-
-            if (batchRetention == BatchRetention.DELETE)
-                continue;
-
-            // We use the absolute offset to decide whether to retain the 
message or not. Due to KAFKA-4298, we have to
-            // allow for the possibility that a previous version corrupted the 
log by writing a compressed record batch
-            // with a magic value not matching the magic of the records (magic 
< 2). This will be fixed as we
-            // recopy the messages to the destination buffer.
-            byte batchMagic = batch.magic();
-            List<Record> retainedRecords = new ArrayList<>();
-
-            final BatchFilterResult iterationResult = filterBatch(batch, 
decompressionBufferSupplier, filterResult, filter,
-                    batchMagic, true, retainedRecords);
-            boolean containsTombstones = iterationResult.containsTombstones;
-            boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
-            long maxOffset = iterationResult.maxOffset;
-
-            if (!retainedRecords.isEmpty()) {
-                // we check if the delete horizon should be set to a new value
-                // in which case, we need to reset the base timestamp and 
overwrite the timestamp deltas
-                // if the batch does not contain tombstones, then we don't 
need to overwrite batch
-                boolean needToSetDeleteHorizon = batch.magic() >= 
RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
-                    && !batch.deleteHorizonMs().isPresent();
-                if (writeOriginalBatch && !needToSetDeleteHorizon) {
-                    batch.writeTo(bufferOutputStream);
-                    filterResult.updateRetainedBatchMetadata(batch, 
retainedRecords.size(), false);
-                } else {
-                    final MemoryRecordsBuilder builder;
-                    long deleteHorizonMs;
-                    if (needToSetDeleteHorizon)
-                        deleteHorizonMs = filter.currentTime + 
filter.deleteRetentionMs;
-                    else
-                        deleteHorizonMs = 
batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP);
-                    builder = buildRetainedRecordsInto(batch, retainedRecords, 
bufferOutputStream, deleteHorizonMs);
-
-                    MemoryRecords records = builder.build();
-                    int filteredBatchSize = records.sizeInBytes();
-                    if (filteredBatchSize > batch.sizeInBytes() && 
filteredBatchSize > maxRecordBatchSize)
-                        log.warn("Record batch from {} with last offset {} 
exceeded max record batch size {} after cleaning " +
-                                        "(new size is {}). Consumers with 
version earlier than 0.10.1.0 may need to " +
-                                        "increase their fetch sizes.",
-                                partition, batch.lastOffset(), 
maxRecordBatchSize, filteredBatchSize);
-
-                    MemoryRecordsBuilder.RecordsInfo info = builder.info();
-                    
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, 
info.shallowOffsetOfMaxTimestamp,
-                            maxOffset, retainedRecords.size(), 
filteredBatchSize);
+        try (ByteBufferOutputStream bufferOutputStream = new 
ByteBufferOutputStream(destinationBuffer)) {

Review Comment:
   Yep! Reader of the code may not realize that the 
ByteBufferOutputStream.Close() is a NoOp (without looking at the code) but with 
the try-resources block, reader won't have to think about resource leaks and 
also, it's a nice hint to signify the lifecycle of the object (which is within 
the try block).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to