This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 50500ed Change retry counts limit of 3 to unlimited for async blob
store operations (#1592)
50500ed is described below
commit 50500edfbe6aa74eaa445ccc0c26ee6bc54c2e66
Author: shekhars-li <[email protected]>
AuthorDate: Fri Mar 25 15:43:48 2022 -0700
Change retry counts limit of 3 to unlimited for async blob store operations
(#1592)
* Change async ops retry counts limit of 3 to unlimited
* Fix typo
* Simplified unit test
* Fix typo
Co-authored-by: Shekhar Sharma <[email protected]>
---
.../java/org/apache/samza/util/FutureUtil.java | 1 +
.../storage/blobstore/util/TestBlobStoreUtil.java | 28 ++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git a/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java
b/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java
index 16e38bc..f2da896 100644
--- a/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java
@@ -149,6 +149,7 @@ public class FutureUtil {
Duration maxDuration = Duration.ofMinutes(10);
RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
+ .withMaxRetries(-1) // Sets maximum retry to unlimited from default of
3 attempts. Retries are now limited by max duration and not retry counts.
.withBackoff(100, 312500, ChronoUnit.MILLIS, 5) // 100 ms, 500 ms,
2500 ms, 12.5 s, 1.05 min, 5.20 min, 5.20 min
.withMaxDuration(maxDuration)
.abortOn(abortRetries) // stop retrying if predicate returns true
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
index a85adef..40c8d8b 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
@@ -960,6 +960,34 @@ public class TestBlobStoreUtil {
verify(mockBlobStoreUtil,
times(storesToBackupOrRestore.size())).getSnapshotIndex(anyString(),
any(Metadata.class));
}
+ /**
+ * This test verifies that a retriable exception is retried more than 3
times (default retry is limited to 3 attempts)
+ */
+ @Test
+ public void testPutFileRetriedMorethanThreeTimes() throws Exception {
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId,
jobName, jobId, taskName, storeName);
+ Path path = Files.createTempFile("samza-testPutFileChecksum-", ".tmp");
+ FileUtil fileUtil = new FileUtil();
+ fileUtil.writeToTextFile(path.toFile(), RandomStringUtils.random(1000),
false);
+
+ BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
+ ArgumentCaptor<Metadata> argumentCaptor =
ArgumentCaptor.forClass(Metadata.class);
+ when(blobStoreManager.put(any(InputStream.class),
argumentCaptor.capture()))
+ .thenAnswer((Answer<CompletionStage<String>>) invocationOnMock -> { //
first try, retriable error
+ return FutureUtil.failedFuture(new RetriableException()); //
retriable error
+ }).thenAnswer((Answer<CompletionStage<String>>) invocationOnMock -> {
// second try, retriable error
+ return FutureUtil.failedFuture(new RetriableException()); //
retriable error
+ }).thenAnswer((Answer<CompletionStage<String>>) invocationOnMock -> {
// third try, retriable error
+ return FutureUtil.failedFuture(new RetriableException()); //
retriable error
+ }).thenAnswer((Answer<CompletionStage<String>>) invocation ->
CompletableFuture.completedFuture("blobId"));
+
+ BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager,
EXECUTOR, null, null);
+
+ blobStoreUtil.putFile(path.toFile(), snapshotMetadata).join();
+ // Verify put operation is retried 4 times
+ verify(blobStoreManager, times(4)).put(any(InputStream.class),
any(Metadata.class));
+ }
+
private CheckpointV2 createCheckpointV2(String stateBackendFactory,
Map<String, String> storeSnapshotIndexBlobIds) {
CheckpointId checkpointId = CheckpointId.create();
Map<String, Map<String, String>> factoryStoreSCMs = new HashMap<>();