This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 8433ac4d31a KAFKA-19221 Propagate IOException on LogSegment#close 
(#20072)
8433ac4d31a is described below

commit 8433ac4d31aa4154b7a7fe3b290c4e2d8d847068
Author: Gaurav Narula <gaurav_naru...@apple.com>
AuthorDate: Sat Jul 5 20:05:34 2025 +0100

    KAFKA-19221 Propagate IOException on LogSegment#close (#20072)
    
    Log segment closure results in right sizing the segment on disk along
    with the associated index files.
    
    This is specially important for TimeIndexes where a failure to right
    size may eventually cause log roll failures leading to under replication
    and log cleaner failures.
    
    This change uses `Utils.closeAll` which propagates exceptions, resulting
    in an "unclean" shutdown. That would then cause the broker to attempt to
    recover the log segment and the index on next startup, thereby avoiding
    the failures described above.
    
    Reviewers: Omnia Ibrahim <o.g.h.ibra...@gmail.com>, Jun Rao
     <jun...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com>
---
 .../apache/kafka/common/record/FileRecords.java    |  4 ++
 .../kafka/server/LogManagerIntegrationTest.java    | 66 ++++++++++++++++++++++
 .../kafka/storage/internals/log/AbstractIndex.java |  4 +-
 .../kafka/storage/internals/log/LogSegment.java    |  5 +-
 .../kafka/storage/internals/log/LogSegments.java   |  4 +-
 .../storage/internals/log/TransactionIndex.java    |  2 +-
 .../storage/internals/log/LogSegmentsTest.java     | 23 +++++++-
 .../storage/internals/log/OffsetIndexTest.java     |  1 -
 8 files changed, 99 insertions(+), 10 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 64dd73de412..ba5cb556cef 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -201,6 +201,10 @@ public class FileRecords extends AbstractRecords 
implements Closeable {
      * Close this record set
      */
     public void close() throws IOException {
+        if (!channel.isOpen()) {
+            return;
+        }
+
         flush();
         trim();
         channel.close();
diff --git a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java 
b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
index 3d386283943..752f56fa418 100644
--- a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
+++ b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
@@ -34,9 +34,11 @@ import 
org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
 import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile;
 import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -59,6 +61,70 @@ public class LogManagerIntegrationTest {
         this.cluster = cluster;
     }
 
+    @ClusterTest(types = {Type.KRAFT})
+    public void testIOExceptionOnLogSegmentCloseResultsInRecovery() throws 
IOException, InterruptedException, ExecutionException {
+        try (Admin admin = cluster.admin()) {
+            admin.createTopics(List.of(new NewTopic("foo", 1, (short) 
1))).all().get();
+        }
+        cluster.waitForTopic("foo", 1);
+
+        // Produce some data into the topic
+        Map<String, Object> producerConfigs = Map.of(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers(),
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName(),
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName()
+        );
+
+        try (Producer<String, String> producer = new 
KafkaProducer<>(producerConfigs)) {
+            producer.send(new ProducerRecord<>("foo", 0, null, "bar")).get();
+            producer.flush();
+        }
+
+        var broker = cluster.brokers().get(0);
+
+        File timeIndexFile = broker.logManager()
+                .getLog(new TopicPartition("foo", 0), false).get()
+                .activeSegment()
+                .timeIndexFile();
+
+        // Set read only so that we throw an IOException on shutdown
+        assertTrue(timeIndexFile.exists());
+        assertTrue(timeIndexFile.setReadOnly());
+
+        broker.shutdown();
+
+        assertEquals(1, broker.config().logDirs().size());
+        String logDir = broker.config().logDirs().head();
+        CleanShutdownFileHandler cleanShutdownFileHandler = new 
CleanShutdownFileHandler(logDir);
+        assertFalse(cleanShutdownFileHandler.exists(), "Did not expect the 
clean shutdown file to exist");
+
+        // Ensure we have a corrupt index on broker shutdown
+        long maxIndexSize = broker.config().logIndexSizeMaxBytes();
+        long expectedIndexSize = 12 * (maxIndexSize / 12);
+        assertEquals(expectedIndexSize, timeIndexFile.length());
+
+        // Allow write permissions before startup
+        assertTrue(timeIndexFile.setWritable(true));
+
+        broker.startup();
+        // make sure there is no error during load logs
+        assertTrue(cluster.firstFatalException().isEmpty());
+        try (Admin admin = cluster.admin()) {
+            TestUtils.waitForCondition(() -> {
+                List<TopicPartitionInfo> partitionInfos = 
admin.describeTopics(List.of("foo"))
+                        .topicNameValues().get("foo").get().partitions();
+                return partitionInfos.get(0).leader().id() == 0;
+            }, "Partition does not have a leader assigned");
+        }
+
+        // Ensure that sanity check does not fail
+        broker.logManager()
+                .getLog(new TopicPartition("foo", 0), false).get()
+                .activeSegment()
+                .timeIndex()
+                .sanityCheck();
+    }
+
     @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3)
     public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws 
IOException, ExecutionException, InterruptedException {
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
index 9c7c645c4a7..46ceb4801a2 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
@@ -263,7 +263,9 @@ public abstract class AbstractIndex implements Closeable {
     public void trimToValidSize() throws IOException {
         lock.lock();
         try {
-            resize(entrySize() * entries);
+            if (mmap != null) {
+                resize(entrySize() * entries);
+            }
         } finally {
             lock.unlock();
         }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index e07d8a2d6a6..b404f8d0796 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -763,10 +763,7 @@ public class LogSegment implements Closeable {
     public void close() throws IOException {
         if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN)
             Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> 
timeIndex().maybeAppend(maxTimestampSoFar(), 
shallowOffsetOfMaxTimestampSoFar(), true));
-        Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER);
-        Utils.closeQuietly(lazyTimeIndex, "timeIndex", LOGGER);
-        Utils.closeQuietly(log, "log", LOGGER);
-        Utils.closeQuietly(txnIndex, "txnIndex", LOGGER);
+        Utils.closeAll(lazyOffsetIndex, lazyTimeIndex, log, txnIndex);
     }
 
     /**
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java
index 586d40e90b3..1987e99e984 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.Closeable;
 import java.io.File;
@@ -105,8 +106,7 @@ public class LogSegments implements Closeable {
      */
     @Override
     public void close() throws IOException {
-        for (LogSegment s : values())
-            s.close();
+        Utils.closeAll(values().toArray(new LogSegment[0]));
     }
 
     /**
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
index 8e089dc3cfc..16403126813 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
@@ -110,7 +110,7 @@ public class TransactionIndex implements Closeable {
 
     public void close() throws IOException {
         FileChannel channel = channelOrNull();
-        if (channel != null)
+        if (channel != null && channel.isOpen())
             channel.close();
         maybeChannel = Optional.empty();
     }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java
index 43da918b29d..97ffb7072bb 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java
@@ -40,7 +40,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class LogSegmentsTest {
@@ -49,7 +52,7 @@ public class LogSegmentsTest {
 
     /* create a segment with the given base offset */
     private static LogSegment createSegment(Long offset) throws IOException {
-        return LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM);
+        return spy(LogTestUtils.createSegment(offset, logDir, 10, 
Time.SYSTEM));
     }
 
     @BeforeEach
@@ -276,4 +279,22 @@ public class LogSegmentsTest {
         }
     }
 
+    @Test
+    public void testCloseClosesAllLogSegmentsOnExceptionWhileClosingOne() 
throws IOException {
+        LogSegment seg1 = createSegment(0L);
+        LogSegment seg2 = createSegment(100L);
+        LogSegment seg3 = createSegment(200L);
+        LogSegments segments = new LogSegments(topicPartition);
+        segments.add(seg1);
+        segments.add(seg2);
+        segments.add(seg3);
+
+        doThrow(new IOException("Failure")).when(seg2).close();
+
+        assertThrows(IOException.class, segments::close, "Expected IOException 
to be thrown");
+        verify(seg1).close();
+        verify(seg2).close();
+        verify(seg3).close();
+    }
+
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
index 918e9dd409c..ad7fa590852 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
@@ -225,7 +225,6 @@ public class OffsetIndexTest {
         idx.forceUnmap();
         // mmap should be null after unmap causing lookup to throw a NPE
         assertThrows(NullPointerException.class, () -> idx.lookup(1));
-        assertThrows(NullPointerException.class, idx::close);
     }
 
     @Test

Reply via email to