This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da61b19aae8 KAFKA-19981: Handle retriable remote storage exception in 
RemoteLogManager (#21150)
da61b19aae8 is described below

commit da61b19aae888005f113a943486c5e22383fcc84
Author: Lan Ding <[email protected]>
AuthorDate: Thu Dec 18 12:19:43 2025 +0800

    KAFKA-19981: Handle retriable remote storage exception in RemoteLogManager 
(#21150)
    
    This PR distinguishes between `RemoteStorageException` and
    `RetriableRemoteStorageException`   in `RemoteLogManager` to handle
    temporary storage degradations gracefully:
    
    1. Copy path: Avoids deleting partially uploaded segments when
    `RetriableRemoteStorageException` is thrown;
    2. Delete path: Skips incrementing failedRemoteDeleteRequestRate metric
    for retriable exceptions;
    3. Documentation: Updates `RemoteStorageManager` Javadoc to clarify
    exception usage;
    4. Testing: Adds UT for retriable  scenarios.
    
    Reviewers: Kamal Chandraprakash <[email protected]>, Luke
     Chen <[email protected]>
---
 .../log/remote/storage/RemoteStorageManager.java   |  12 +-
 .../log/remote/storage/RemoteLogManager.java       |  13 +-
 .../log/remote/storage/RemoteLogManagerTest.java   | 143 ++++++++++++++++++++-
 3 files changed, 160 insertions(+), 8 deletions(-)

diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
index 3fd6a633b7d..e17e3af596d 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
@@ -43,6 +43,11 @@ import java.util.Optional;
  * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the 
manager to register metrics.
  * The following tags are automatically added to all metrics registered: 
<code>config</code> set to
  * <code>remote.log.storage.manager.class.name</code>, and <code>class</code> 
set to the RemoteStorageManager class name.
+ * <p>
+ * Plugin implementors of {@link RemoteStorageManager} should throw {@link 
RetriableRemoteStorageException}
+ * for transient errors that can be recovered by retrying. For non-recoverable 
errors,
+ * {@link RemoteStorageException} should be thrown. This distinction allows 
RemoteLogManager to
+ * handle retries gracefully and report metrics accurately.
  */
 public interface RemoteStorageManager extends Configurable, Closeable {
 
@@ -90,11 +95,11 @@ public interface RemoteStorageManager extends Configurable, 
Closeable {
      * @param remoteLogSegmentMetadata metadata about the remote log segment.
      * @param logSegmentData           data to be copied to tiered storage.
      * @return custom metadata to be added to the segment metadata after 
copying.
-     * @throws RemoteStorageException if there are any errors in storing the 
data of the segment.
+     * @throws RemoteStorageException          if there are any errors in 
storing the data of the segment.
+     * @throws RetriableRemoteStorageException if the error is transient and 
the operation can be retried.
      */
     Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
-                                                LogSegmentData logSegmentData)
-            throws RemoteStorageException;
+                                                LogSegmentData logSegmentData) 
throws RemoteStorageException;
 
     /**
      * Returns the remote log segment data file/object as InputStream for the 
given {@link RemoteLogSegmentMetadata}
@@ -150,6 +155,7 @@ public interface RemoteStorageManager extends Configurable, 
Closeable {
      *
      * @param remoteLogSegmentMetadata metadata about the remote log segment 
to be deleted.
      * @throws RemoteStorageException          if there are any storage 
related errors occurred.
+     * @throws RetriableRemoteStorageException if the error is transient and 
the operation can be retried.
      */
     void deleteLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) throws RemoteStorageException;
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index a9848633eff..604a43925bf 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -836,7 +836,7 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
                 if (!isCancelled()) {
                     logger.warn("Current thread for partition {} is 
interrupted", topicIdPartition, ex);
                 }
-            } catch (RetriableException ex) {
+            } catch (RetriableException | RetriableRemoteStorageException ex) {
                 logger.debug("Encountered a retryable error while executing 
current task for partition {}", topicIdPartition, ex);
             } catch (Exception ex) {
                 if (!isCancelled()) {
@@ -869,7 +869,7 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
         }
 
         @Override
-        protected void execute(UnifiedLog log) throws InterruptedException {
+        protected void execute(UnifiedLog log) throws InterruptedException, 
RetriableRemoteStorageException {
             // In the first run after completing altering logDir within 
broker, we should make sure the state is reset. (KAFKA-16711)
             if (!log.parentDir().equals(logDirectory.orElse(null))) {
                 copiedOffsetOption = Optional.empty();
@@ -928,7 +928,7 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
             return candidateLogSegments;
         }
 
-        public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException {
+        public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException, RetriableRemoteStorageException {
             if (isCancelled())
                 return;
 
@@ -1001,7 +1001,7 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
                 
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
                 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
                 this.cancel();
-            } catch (InterruptedException | RetriableException ex) {
+            } catch (InterruptedException | RetriableException | 
RetriableRemoteStorageException ex) {
                 throw ex;
             } catch (Exception ex) {
                 if (!isCancelled()) {
@@ -1044,6 +1044,9 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
             
             try {
                 customMetadata = 
remoteStorageManagerPlugin.get().copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+            } catch (RetriableRemoteStorageException e) {
+                logger.info("Copy failed with retriable error for segment {}", 
copySegmentStartedRlsm.remoteLogSegmentId());
+                throw e;
             } catch (RemoteStorageException e) {
                 logger.info("Copy failed, cleaning segment {}", 
copySegmentStartedRlsm.remoteLogSegmentId());
                 try {
@@ -1513,6 +1516,8 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
             // Delete the segment in remote storage.
             try {
                 
remoteStorageManagerPlugin.get().deleteLogSegmentData(segmentMetadata);
+            } catch (RetriableRemoteStorageException e) {
+                throw e;
             } catch (RemoteStorageException e) {
                 
brokerTopicStats.topicStats(topic).failedRemoteDeleteRequestRate().mark();
                 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().mark();
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index 5b0676088e1..bcb0b91b0f8 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -774,6 +774,92 @@ public class RemoteLogManagerTest {
         assertEquals(1, 
brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value());
     }
 
+    @Test
+    void 
testFailedCopyWithRetriableExceptionShouldNotDeleteTheDanglingSegment() throws 
Exception {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+        long lastStableOffset = 150L;
+        long logEndOffset = 150L;
+
+        when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(12L);
+        when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(2L);
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(cache);
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(-1L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+        when(activeSegment.size()).thenReturn(2);
+        verify(oldSegment, times(0)).readNextOffset();
+        verify(activeSegment, times(0)).readNextOffset();
+
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(List.of(oldSegment, activeSegment));
+
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
+        when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.timeIndex()).thenReturn(timeIdx);
+        when(oldSegment.offsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
+        // throw retriable exception when copyLogSegmentData
+        
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class)))
+            .thenThrow(new RetriableRemoteStorageException("test-retriable"));
+        RemoteLogManager.RLMCopyTask task = remoteLogManager.new 
RLMCopyTask(leaderTopicIdPartition, 128);
+        assertThrows(RetriableRemoteStorageException.class, () -> 
task.copyLogSegmentsToRemote(mockLog));
+
+        ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = 
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
+        
verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());
+        // verify the segment is not deleted for retriable exception
+        verify(remoteStorageManager, 
never()).deleteLogSegmentData(eq(remoteLogSegmentMetadataArg.getValue()));
+        verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+
+        // Verify the metrics
+        // Retriable exceptions should not count as failures for copy
+        assertEquals(1, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(1, 
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+        assertEquals(10, 
brokerTopicStats.allTopicsStats().remoteCopyLagBytesAggrMetric().value());
+        assertEquals(1, 
brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value());
+    }
+
     @Test
     void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() 
throws Exception {
         long oldSegmentStartOffset = 0L;
@@ -2401,7 +2487,7 @@ public class RemoteLogManagerTest {
         Thread copyThread  = new Thread(() -> {
             try {
                 copyTask.copyLogSegmentsToRemote(mockLog);
-            } catch (InterruptedException e) {
+            } catch (InterruptedException | RetriableRemoteStorageException e) 
{
                 throw new RuntimeException(e);
             }
         });
@@ -2840,6 +2926,61 @@ public class RemoteLogManagerTest {
         verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
     }
 
+    @ParameterizedTest(name = 
"testDeleteSegmentFailureWithRetriableExceptionShouldNotUpdateMetrics 
retentionSize={0} retentionMs={1}")
+    @CsvSource(value = {"0, -1", "-1, 0"})
+    public void 
testDeleteSegmentFailureWithRetriableExceptionShouldNotUpdateMetrics(long 
retentionSize,
+                                                                               
      long retentionMs) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", retentionSize);
+        logProps.put("retention.ms", retentionMs);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        List<EpochEntry> epochEntries = List.of(epochEntry0);
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(cache);
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
+
+        List<RemoteLogSegmentMetadata> metadataList =
+            listRemoteLogSegmentMetadata(leaderTopicIdPartition, 1, 100, 1024, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+            .thenReturn(metadataList.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+            .thenAnswer(ans -> metadataList.iterator());
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+            .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+        // Verify the metrics for remote deletes and for failures is zero 
before attempt to delete segments
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+
+        RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
+        doThrow(new RetriableRemoteStorageException("Failed to delete segment 
with retriable 
exception")).when(remoteStorageManager).deleteLogSegmentData(any());
+        assertThrows(RetriableRemoteStorageException.class, 
task::cleanupExpiredRemoteLogSegments);
+
+        assertEquals(100L, currentLogStartOffset.get());
+        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+
+        // Verify the metric for remote delete is updated correctly
+        assertEquals(1, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
+        // Verify we did not report failure for remote deletes with retriable 
exception
+        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(1, 
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
+        assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+
+        // make sure we'll retry the deletion in next run
+        doNothing().when(remoteStorageManager).deleteLogSegmentData(any());
+        task.cleanupExpiredRemoteLogSegments();
+        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+    }
+
     @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach 
segmentCount={0} deletableSegmentCount={1}")
     @CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"})
     public void testDeleteLogSegmentDueToRetentionSizeBreach(int segmentCount,

Reply via email to