This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 50500ed  Change retry counts limit of 3 to unlimited for async blob 
store operations (#1592)
50500ed is described below

commit 50500edfbe6aa74eaa445ccc0c26ee6bc54c2e66
Author: shekhars-li <[email protected]>
AuthorDate: Fri Mar 25 15:43:48 2022 -0700

    Change retry counts limit of 3 to unlimited for async blob store operations 
(#1592)
    
    * Change async ops retry counts limit of 3 to unlimited
    
    * Fix typo
    
    * Simplified unit test
    
    * Fix typo
    
    Co-authored-by: Shekhar Sharma <[email protected]>
---
 .../java/org/apache/samza/util/FutureUtil.java     |  1 +
 .../storage/blobstore/util/TestBlobStoreUtil.java  | 28 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java
index 16e38bc..f2da896 100644
--- a/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java
@@ -149,6 +149,7 @@ public class FutureUtil {
     Duration maxDuration = Duration.ofMinutes(10);
 
     RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
+        .withMaxRetries(-1) // Sets maximum retry to unlimited from default of 
3 attempts. Retries are now limited by max duration and not retry counts.
         .withBackoff(100, 312500, ChronoUnit.MILLIS, 5) // 100 ms, 500 ms, 
2500 ms, 12.5 s, 1.05 min, 5.20 min, 5.20 min
         .withMaxDuration(maxDuration)
         .abortOn(abortRetries) // stop retrying if predicate returns true
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
index a85adef..40c8d8b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
@@ -960,6 +960,34 @@ public class TestBlobStoreUtil {
     verify(mockBlobStoreUtil, 
times(storesToBackupOrRestore.size())).getSnapshotIndex(anyString(), 
any(Metadata.class));
   }
 
+  /**
+   * This test verifies that a retriable exception is retried more than 3 
times (default retry is limited to 3 attempts)
+   */
+  @Test
+  public void testPutFileRetriedMorethanThreeTimes() throws Exception {
+    SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId, 
jobName, jobId, taskName, storeName);
+    Path path = Files.createTempFile("samza-testPutFileChecksum-", ".tmp");
+    FileUtil fileUtil = new FileUtil();
+    fileUtil.writeToTextFile(path.toFile(), RandomStringUtils.random(1000), 
false);
+
+    BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
+    ArgumentCaptor<Metadata> argumentCaptor = 
ArgumentCaptor.forClass(Metadata.class);
+    when(blobStoreManager.put(any(InputStream.class), 
argumentCaptor.capture()))
+        .thenAnswer((Answer<CompletionStage<String>>) invocationOnMock -> { // 
first try, retriable error
+          return FutureUtil.failedFuture(new RetriableException()); // 
retriable error
+        }).thenAnswer((Answer<CompletionStage<String>>) invocationOnMock -> { 
// second try, retriable error
+          return FutureUtil.failedFuture(new RetriableException()); // 
retriable error
+        }).thenAnswer((Answer<CompletionStage<String>>) invocationOnMock -> { 
// third try, retriable error
+          return FutureUtil.failedFuture(new RetriableException()); // 
retriable error
+        }).thenAnswer((Answer<CompletionStage<String>>) invocation -> 
CompletableFuture.completedFuture("blobId"));
+
+    BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, 
EXECUTOR, null, null);
+
+    blobStoreUtil.putFile(path.toFile(), snapshotMetadata).join();
+    // Verify put operation is retried 4 times
+    verify(blobStoreManager, times(4)).put(any(InputStream.class), 
any(Metadata.class));
+  }
+
   private CheckpointV2 createCheckpointV2(String stateBackendFactory, 
Map<String, String> storeSnapshotIndexBlobIds) {
     CheckpointId checkpointId = CheckpointId.create();
     Map<String, Map<String, String>> factoryStoreSCMs = new HashMap<>();

Reply via email to