junrao commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1364689270


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1617,10 +1617,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
     // we manually override the state offset here prior to taking the snapshot.
     producerStateManager.updateMapEndOffset(newSegment.baseOffset)
-    producerStateManager.takeSnapshot()
+    // We avoid potentially-costly fsync call, since we acquire 
UnifiedLog#lock here
+    // which could block subsequent produces in the meantime.
+    // flush is done in the scheduler thread along with segment flushing below
+    val maybeSnapshot = producerStateManager.takeSnapshot(false)
     updateHighWatermarkWithLogEndOffset()
     // Schedule an asynchronous flush of the old segment
-    scheduler.scheduleOnce("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
+    scheduler.scheduleOnce("flush-log", () => {
+      maybeSnapshot.ifPresent(f => Utils.flushFileQuietly(f.toPath, 
"producer-snapshot"))

Review Comment:
   If we fail to flush the snapshot, it seems that we should propagate the 
IOException to logDirFailureChannel like in flushUptoOffsetExclusive. 
Otherwise, we could be skipping the recovery of producer state when we should. 



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -308,7 +308,14 @@ public void truncateFromEnd(long endOffset) {
             if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
                 List<EpochEntry> removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
 
-                flush();
+                // We intentionally don't force flushing change to the device 
here because:
+                // - To avoid fsync latency
+                //   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
+                //   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
+                //     then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
+                // - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
+                //   another truncateFromEnd call on log loading procedure so 
it won't be a problem
+                flush(false);

Review Comment:
   It's kind of weird to call `flush` with sync = false since the only thing 
that `flush` does is to sync. Could we just avoid calling `flush`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -152,7 +152,7 @@ private List<EpochEntry> 
removeWhileMatching(Iterator<Map.Entry<Integer, EpochEn
     }
 
     public LeaderEpochFileCache 
cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) {
-        flushTo(leaderEpochCheckpoint);
+        flushTo(leaderEpochCheckpoint, true);

Review Comment:
   `cloneWithLeaderEpochCheckpoint` seems no longer used. Could we just remove 
it?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -462,22 +462,32 @@ public Optional<ProducerStateEntry> lastEntry(long 
producerId) {
     }
 
     /**
-     * Take a snapshot at the current end offset if one does not already exist.
+     * Take a snapshot at the current end offset if one does not already exist 
with syncing the change to the device
      */
     public void takeSnapshot() throws IOException {
+        takeSnapshot(true);
+    }
+
+    /**
+     * Take a snapshot at the current end offset if one does not already 
exist, then return the snapshot file if taken.
+     */
+    public Optional<File> takeSnapshot(boolean sync) throws IOException {

Review Comment:
   `ProducerStateManager.truncateFullyAndReloadSnapshots` removes all snapshot 
files and then calls `loadSnapshots()`, which should return empty. I am 
wondering what happens if we have an pending async snapshot flush and the flush 
is called after the underlying file is deleted because of 
`ProducerStateManager.truncateFullyAndReloadSnapshots`. Will that cause the 
file to be recreated or will it get an `IOException`? The former will be bad 
since the content won't be correct. For the latter, it would be useful to 
distinguish that from a real disk IO error to avoid unnecessarily crash the 
broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to