This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d71d0639d9d KAFKA-15046: Get rid of unnecessary fsyncs inside 
UnifiedLog.lock to stabilize performance (#14242)
d71d0639d9d is described below

commit d71d0639d9d5ae39153d3e1f52f8ba8bd2ae8044
Author: Okada Haruki <[email protected]>
AuthorDate: Thu Nov 30 02:43:44 2023 +0900

    KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to 
stabilize performance (#14242)
    
    While any blocking operation under holding the UnifiedLog.lock could lead 
to serious performance (even availability) issues, currently there are several 
paths that calls fsync(2) inside the lock
    In the meantime the lock is held, all subsequent produces against the 
partition may block
    This easily causes all request-handlers to be busy on bad disk performance
    Even worse, when a disk experiences tens of seconds of glitch (it's not 
rare in spinning drives), it makes the broker to unable to process any requests 
with unfenced from the cluster (i.e. "zombie" like status)
    This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls 
performed under the lock:
    (1) ProducerStateManager.takeSnapshot at UnifiedLog.roll
    I moved fsync(2) call to the scheduler thread as part of existing 
"flush-log" job (before incrementing recovery point)
    Since it's still ensured that the snapshot is flushed before incrementing 
recovery point, this change shouldn't cause any problem
    (2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log 
segment deletion
    This method calls Utils.atomicMoveWithFallback with needFlushParentDir = 
true internally, which calls fsync.
    I changed it to call Utils.atomicMoveWithFallback with needFlushParentDir = 
false (which is consistent behavior with index files deletion. index files 
deletion also doesn't flush parent dir)
    This change shouldn't cause problems neither.
    (3) LeaderEpochFileCache.truncateFromStart when incrementing 
log-start-offset
    This path is called from deleteRecords on request-handler threads.
    Here, we don't need fsync(2) either actually.
    On unclean shutdown, few leader epochs might be remained in the file but it 
will be handled by LogLoader on start-up so not a problem
    (4) LeaderEpochFileCache.truncateFromEnd as part of log truncation
    Likewise, we don't need fsync(2) here, since any epochs which are 
untruncated on unclean shutdown will be handled on log loading procedure
    
    Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>, 
Justine Olshan <[email protected]>, Jun Rao <[email protected]>
---
 .../java/org/apache/kafka/common/utils/Utils.java  | 13 +++++++-
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 18 +++++++++--
 .../server/checkpoints/OffsetCheckpointFile.scala  |  2 +-
 .../kafka/log/remote/RemoteLogManagerTest.java     |  2 +-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |  2 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 24 ++++++++++++--
 ...ffsetCheckpointFileWithFailureHandlerTest.scala |  2 +-
 .../server/epoch/LeaderEpochFileCacheTest.scala    |  2 +-
 .../apache/kafka/server/common/CheckpointFile.java |  8 +++--
 .../metadata/storage/CommittedOffsetsFile.java     |  4 +--
 .../CheckpointFileWithFailureHandler.java          |  4 +--
 .../checkpoint/InMemoryLeaderEpochCheckpoint.java  |  4 +--
 .../checkpoint/LeaderEpochCheckpoint.java          |  7 +++-
 .../checkpoint/LeaderEpochCheckpointFile.java      |  8 +++--
 .../internals/epoch/LeaderEpochFileCache.java      | 37 ++++++++++++----------
 .../internals/log/ProducerStateManager.java        | 22 ++++++++++---
 .../kafka/storage/internals/log/SnapshotFile.java  |  8 ++---
 17 files changed, 118 insertions(+), 49 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 6a0913d3c2d..6e1c3cefbb1 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1021,6 +1021,17 @@ public final class Utils {
         }
     }
 
+    /**
+     * Flushes dirty file with swallowing {@link NoSuchFileException}
+     */
+    public static void flushFileIfExists(Path path) throws IOException {
+        try (FileChannel fileChannel = FileChannel.open(path, 
StandardOpenOption.READ)) {
+            fileChannel.force(true);
+        } catch (NoSuchFileException e) {
+            log.warn("Failed to flush file {}", path, e);
+        }
+    }
+
     /**
      * Closes all the provided closeables.
      * @throws IOException if any of the close methods throws an IOException.
@@ -1543,7 +1554,7 @@ public final class Utils {
      * Checks if a string is null, empty or whitespace only.
      * @param str a string to be checked
      * @return true if the string is null, empty or whitespace only; 
otherwise, return false.
-     */    
+     */
     public static boolean isBlank(String str) {
         return str == null || str.trim().isEmpty();
     }
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 0691cff51d0..8ca58ad20f0 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -44,7 +44,7 @@ import 
org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
 import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, 
LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, 
LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, 
ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, 
RollParams, VerificationGuard}
 
 import java.io.{File, IOException}
-import java.nio.file.Files
+import java.nio.file.{Files, Path}
 import java.util
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import java.util.stream.Collectors
@@ -1656,10 +1656,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
     // we manually override the state offset here prior to taking the snapshot.
     producerStateManager.updateMapEndOffset(newSegment.baseOffset)
-    producerStateManager.takeSnapshot()
+    // We avoid potentially-costly fsync call, since we acquire 
UnifiedLog#lock here
+    // which could block subsequent produces in the meantime.
+    // flush is done in the scheduler thread along with segment flushing below
+    val maybeSnapshot = producerStateManager.takeSnapshot(false)
     updateHighWatermarkWithLogEndOffset()
     // Schedule an asynchronous flush of the old segment
-    scheduler.scheduleOnce("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
+    scheduler.scheduleOnce("flush-log", () => {
+      maybeSnapshot.ifPresent(f => flushProducerStateSnapshot(f.toPath))
+      flushUptoOffsetExclusive(newSegment.baseOffset)
+    })
     newSegment
   }
 
@@ -1742,6 +1748,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     producerStateManager.mapEndOffset
   }
 
+  private[log] def flushProducerStateSnapshot(snapshot: Path): Unit = {
+    maybeHandleIOException(s"Error while deleting producer state snapshot 
$snapshot for $topicPartition in dir ${dir.getParent}") {
+      Utils.flushFileIfExists(snapshot)
+    }
+  }
+
   /**
    * Truncate this log so that it ends with the greatest offset < targetOffset.
    *
diff --git 
a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala 
b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 084e46c5ef2..de3283d21fd 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, 
logDirFailureChannel: LogDirFailureCh
   def write(offsets: Map[TopicPartition, Long]): Unit = {
     val list: java.util.List[(TopicPartition, Long)] = new 
java.util.ArrayList[(TopicPartition, Long)](offsets.size)
     offsets.foreach(x => list.add(x))
-    checkpoint.write(list)
+    checkpoint.write(list, true)
   }
 
   def read(): Map[TopicPartition, Long] = {
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 0e2ac1b8f06..de1245f5569 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -184,7 +184,7 @@ public class RemoteLogManagerTest {
         List<EpochEntry> epochs = Collections.emptyList();
 
         @Override
-        public void write(Collection<EpochEntry> epochs) {
+        public void write(Collection<EpochEntry> epochs, boolean ignored) {
             this.epochs = new ArrayList<>(epochs);
         }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index aacab5b624e..11fff517b43 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -404,7 +404,7 @@ class LogSegmentTest {
     val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
       private var epochs = Seq.empty[EpochEntry]
 
-      override def write(epochs: util.Collection[EpochEntry]): Unit = {
+      override def write(epochs: util.Collection[EpochEntry], ignored: 
Boolean): Unit = {
         this.epochs = epochs.asScala.toSeq
       }
 
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 9f9acaf33c7..1a4ffaa57d0 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -42,8 +42,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.EnumSource
 import org.mockito.ArgumentMatchers
-import org.mockito.ArgumentMatchers.anyLong
-import org.mockito.Mockito.{mock, when}
+import org.mockito.ArgumentMatchers.{any, anyLong}
+import org.mockito.Mockito.{doThrow, mock, spy, when}
 
 import java.io._
 import java.nio.ByteBuffer
@@ -3625,7 +3625,7 @@ class UnifiedLogTest {
         val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
         log.appendAsLeader(records, leaderEpoch = 0)
       }
-      
+
       log.updateHighWatermark(90L)
       log.maybeIncrementLogStartOffset(20L, 
LogStartOffsetIncrementReason.SegmentDeletion)
       assertEquals(20, log.logStartOffset)
@@ -3911,6 +3911,24 @@ class UnifiedLogTest {
     log.appendAsLeader(transactionalRecords, leaderEpoch = 0, 
verificationGuard = verificationGuard)
   }
 
+  @Test
+  def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): 
Unit = {
+    val logConfig = LogTestUtils.createLogConfig()
+    val log = spy(createLog(logDir, logConfig))
+
+    doThrow(new KafkaStorageException("Injected 
exception")).when(log).flushProducerStateSnapshot(any())
+
+    log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 
0)
+    try {
+      log.roll(Some(1L))
+    } catch {
+      case _: KafkaStorageException => // ignore
+    }
+
+    // check that the recovery point isn't incremented
+    assertEquals(0L, log.recoveryPoint)
+  }
+
   @Test
   def testDeletableSegmentsFilter(): Unit = {
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
diff --git 
a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala
 
b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala
index ddbf58d884e..a7e370d7f40 100644
--- 
a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala
@@ -97,7 +97,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends 
Logging {
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val checkpointFile = new CheckpointFileWithFailureHandler(file, 
OffsetCheckpointFile.CurrentVersion + 1,
       OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
-    checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 
5) -> 10L))
+    checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 
5) -> 10L), true)
     assertThrows(classOf[KafkaStorageException], () => new 
OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read())
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index a2de160a0ed..a47c902024a 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -38,7 +38,7 @@ class LeaderEpochFileCacheTest {
   val tp = new TopicPartition("TestTopic", 5)
   private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
     private var epochs: Seq[EpochEntry] = Seq()
-    override def write(epochs: java.util.Collection[EpochEntry]): Unit = 
this.epochs = epochs.asScala.toSeq
+    override def write(epochs: java.util.Collection[EpochEntry], ignored: 
Boolean): Unit = this.epochs = epochs.asScala.toSeq
     override def read(): java.util.List[EpochEntry] = this.epochs.asJava
   }
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
index 6efbaa136e0..9c115881328 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
@@ -72,7 +72,7 @@ public class CheckpointFile<T> {
         tempPath = Paths.get(absolutePath + ".tmp");
     }
 
-    public void write(Collection<T> entries) throws IOException {
+    public void write(Collection<T> entries, boolean sync) throws IOException {
         synchronized (lock) {
             // write to temp file and then swap with the existing file
             try (FileOutputStream fileOutputStream = new 
FileOutputStream(tempPath.toFile());
@@ -80,10 +80,12 @@ public class CheckpointFile<T> {
                 CheckpointWriteBuffer<T> checkpointWriteBuffer = new 
CheckpointWriteBuffer<>(writer, version, formatter);
                 checkpointWriteBuffer.write(entries);
                 writer.flush();
-                fileOutputStream.getFD().sync();
+                if (sync) {
+                    fileOutputStream.getFD().sync();
+                }
             }
 
-            Utils.atomicMoveWithFallback(tempPath, absolutePath);
+            Utils.atomicMoveWithFallback(tempPath, absolutePath, sync);
         }
     }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
index 1eddc0b788b..a08e0f30507 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
@@ -68,7 +68,7 @@ public class CommittedOffsetsFile {
     }
 
     public synchronized void writeEntries(Map<Integer, Long> committedOffsets) 
throws IOException {
-        checkpointFile.write(committedOffsets.entrySet());
+        checkpointFile.write(committedOffsets.entrySet(), true);
     }
 
     public synchronized Map<Integer, Long> readEntries() throws IOException {
@@ -83,4 +83,4 @@ public class CommittedOffsetsFile {
 
         return partitionToOffsets;
     }
-}
\ No newline at end of file
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java
index f780ced9b04..35abfb5a984 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java
@@ -41,9 +41,9 @@ public class CheckpointFileWithFailureHandler<T> {
         checkpointFile = new CheckpointFile<>(file, version, formatter);
     }
 
-    public void write(Collection<T> entries) {
+    public void write(Collection<T> entries, boolean sync) {
         try {
-            checkpointFile.write(entries);
+            checkpointFile.write(entries, sync);
         } catch (IOException e) {
             String msg = "Error while writing to checkpoint file " + 
file.getAbsolutePath();
             logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e);
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
index 3ef30b2502a..386712b330f 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
@@ -42,7 +42,7 @@ import java.util.List;
 public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {
     private List<EpochEntry> epochs = Collections.emptyList();
 
-    public void write(Collection<EpochEntry> epochs) {
+    public void write(Collection<EpochEntry> epochs, boolean ignored) {
         this.epochs = new ArrayList<>(epochs);
     }
 
@@ -60,4 +60,4 @@ public class InMemoryLeaderEpochCheckpoint implements 
LeaderEpochCheckpoint {
 
         return ByteBuffer.wrap(stream.toByteArray());
     }
-}
\ No newline at end of file
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java
index 8cf55195126..28ffae03df0 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java
@@ -22,8 +22,13 @@ import java.util.Collection;
 import java.util.List;
 
 public interface LeaderEpochCheckpoint {
+    // in file-backed checkpoint implementation, the content should be
+    // synced to the device if `sync` is true
+    void write(Collection<EpochEntry> epochs, boolean sync);
 
-    void write(Collection<EpochEntry> epochs);
+    default void write(Collection<EpochEntry> epochs) {
+        write(epochs, true);
+    }
 
     List<EpochEntry> read();
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java
index 81527c6377a..3472182aeea 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java
@@ -53,7 +53,11 @@ public class LeaderEpochCheckpointFile implements 
LeaderEpochCheckpoint {
     }
 
     public void write(Collection<EpochEntry> epochs) {
-        checkpoint.write(epochs);
+        write(epochs, true);
+    }
+
+    public void write(Collection<EpochEntry> epochs, boolean sync) {
+        checkpoint.write(epochs, sync);
     }
 
     public List<EpochEntry> read() {
@@ -75,4 +79,4 @@ public class LeaderEpochCheckpointFile implements 
LeaderEpochCheckpoint {
             return (strings.length == 2) ? Optional.of(new 
EpochEntry(Integer.parseInt(strings[0]), Long.parseLong(strings[1]))) : 
Optional.empty();
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
index a2c4ae7e0fc..03df6cc0dce 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
@@ -73,7 +73,7 @@ public class LeaderEpochFileCache {
         EpochEntry entry = new EpochEntry(epoch, startOffset);
         if (assign(entry)) {
             log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
-            flush();
+            writeToFile(true);
         }
     }
 
@@ -83,7 +83,7 @@ public class LeaderEpochFileCache {
                 log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
             }
         });
-        if (!entries.isEmpty()) flush();
+        if (!entries.isEmpty()) writeToFile(true);
     }
 
     private boolean isUpdateNeeded(EpochEntry entry) {
@@ -152,11 +152,6 @@ public class LeaderEpochFileCache {
         return removedEpochs;
     }
 
-    public LeaderEpochFileCache 
cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) {
-        flushTo(leaderEpochCheckpoint);
-        return new LeaderEpochFileCache(this.topicPartition, 
leaderEpochCheckpoint);
-    }
-
     public boolean nonEmpty() {
         lock.readLock().lock();
         try {
@@ -318,7 +313,14 @@ public class LeaderEpochFileCache {
             if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
                 List<EpochEntry> removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
 
-                flush();
+                // We intentionally don't force flushing change to the device 
here because:
+                // - To avoid fsync latency
+                //   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
+                //   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
+                //     then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
+                // - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
+                //   another truncateFromEnd call on log loading procedure so 
it won't be a problem
+                writeToFile(false);
 
                 log.debug("Cleared entries {} from epoch cache after 
truncating to end offset {}, leaving {} entries in the cache.", removedEntries, 
endOffset, epochs.size());
             }
@@ -345,7 +347,14 @@ public class LeaderEpochFileCache {
                 EpochEntry updatedFirstEntry = new 
EpochEntry(firstBeforeStartOffset.epoch, startOffset);
                 epochs.put(updatedFirstEntry.epoch, updatedFirstEntry);
 
-                flush();
+                // We intentionally don't force flushing change to the device 
here because:
+                // - To avoid fsync latency
+                //   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
+                //   * This method is called as part of deleteRecords with 
holding UnifiedLog#lock.
+                //      - Meanwhile all produces against the partition will be 
blocked, which causes req-handlers to exhaust
+                // - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be recovered by
+                //   another truncateFromStart call on log loading procedure 
so it won't be a problem
+                writeToFile(false);
 
                 log.debug("Cleared entries {} and rewrote first entry {} after 
truncating to start offset {}, leaving {} in the cache.", removedEntries, 
updatedFirstEntry, startOffset, epochs.size());
             }
@@ -394,7 +403,7 @@ public class LeaderEpochFileCache {
         lock.writeLock().lock();
         try {
             epochs.clear();
-            flush();
+            writeToFile(true);
         } finally {
             lock.writeLock().unlock();
         }
@@ -431,16 +440,12 @@ public class LeaderEpochFileCache {
         }
     }
 
-    private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) {
+    private void writeToFile(boolean sync) {
         lock.readLock().lock();
         try {
-            leaderEpochCheckpoint.write(epochs.values());
+            checkpoint.write(epochs.values(), sync);
         } finally {
             lock.readLock().unlock();
         }
     }
-
-    private void flush() {
-        flushTo(this.checkpoint);
-    }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
index d3e48ef7057..6bcafd2d607 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
@@ -462,14 +462,21 @@ public class ProducerStateManager {
     }
 
     /**
-     * Take a snapshot at the current end offset if one does not already exist.
+     * Take a snapshot at the current end offset if one does not already exist 
with syncing the change to the device
      */
     public void takeSnapshot() throws IOException {
+        takeSnapshot(true);
+    }
+
+    /**
+     * Take a snapshot at the current end offset if one does not already 
exist, then return the snapshot file if taken.
+     */
+    public Optional<File> takeSnapshot(boolean sync) throws IOException {
         // If not a new offset, then it is not worth taking another snapshot
         if (lastMapOffset > lastSnapOffset) {
             SnapshotFile snapshotFile = new 
SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset));
             long start = time.hiResClockMs();
-            writeSnapshot(snapshotFile.file(), producers);
+            writeSnapshot(snapshotFile.file(), producers, sync);
             log.info("Wrote producer snapshot at offset {} with {} producer 
ids in {} ms.", lastMapOffset,
                     producers.size(), time.hiResClockMs() - start);
 
@@ -477,7 +484,10 @@ public class ProducerStateManager {
 
             // Update the last snap offset according to the serialized map
             lastSnapOffset = lastMapOffset;
+
+            return Optional.of(snapshotFile.file());
         }
+        return Optional.empty();
     }
 
     /**
@@ -635,7 +645,7 @@ public class ProducerStateManager {
             // deletion, so ignoring the exception here just means that the 
intended operation was
             // already completed.
             try {
-                snapshotFile.renameTo(LogFileUtils.DELETED_FILE_SUFFIX);
+                snapshotFile.renameToDelete();
                 return Optional.of(snapshotFile);
             } catch (NoSuchFileException ex) {
                 log.info("Failed to rename producer state snapshot {} with 
deletion suffix because it was already deleted", 
snapshotFile.file().getAbsoluteFile());
@@ -684,7 +694,7 @@ public class ProducerStateManager {
         }
     }
 
-    private static void writeSnapshot(File file, Map<Long, ProducerStateEntry> 
entries) throws IOException {
+    private static void writeSnapshot(File file, Map<Long, ProducerStateEntry> 
entries, boolean sync) throws IOException {
         Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA);
         struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION);
         struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries
@@ -716,7 +726,9 @@ public class ProducerStateManager {
 
         try (FileChannel fileChannel = FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
             fileChannel.write(buffer);
-            fileChannel.force(true);
+            if (sync) {
+                fileChannel.force(true);
+            }
         }
     }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java
index be496ab2998..61fdae79950 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java
@@ -60,10 +60,10 @@ public class SnapshotFile {
         return file;
     }
 
-    public void renameTo(String newSuffix) throws IOException {
-        File renamed = new File(Utils.replaceSuffix(file.getPath(), "", 
newSuffix));
+    public void renameToDelete() throws IOException {
+        File renamed = new File(Utils.replaceSuffix(file.getPath(), "", 
LogFileUtils.DELETED_FILE_SUFFIX));
         try {
-            Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath());
+            Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath(), 
false);
         } finally {
             file = renamed;
         }
@@ -76,4 +76,4 @@ public class SnapshotFile {
                 ", file=" + file +
                 ')';
     }
-}
\ No newline at end of file
+}

Reply via email to