This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9b56cdd  CASSANDRASC-105 RestoreSliceTask could be stuck due to 
missing exception handling (#107)
9b56cdd is described below

commit 9b56cdd11a25550f0628e99ba35650c44663eb19
Author: Saranya Krishnakumar <sarany...@apple.com>
AuthorDate: Mon Mar 18 16:49:57 2024 -0700

    CASSANDRASC-105 RestoreSliceTask could be stuck due to missing exception 
handling (#107)
    
    patch by Saranya Krishnakumar; reviewed by Francisco Guerrero, Yifan Cai 
for CASSANDRASC-105
---
 CHANGES.txt                                        |   1 +
 .../sidecar/restore/RestoreSliceTask.java          |  58 ++++++-----
 .../sidecar/utils/AsyncFileDigestVerifier.java     |   4 +-
 .../sidecar/restore/RestoreSliceTaskTest.java      | 112 ++++++++++++++++++++-
 4 files changed, 145 insertions(+), 30 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0fdc914..e625cde 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * RestoreSliceTask could be stuck due to missing exception handling 
(CASSANDRASC-105)
  * Make hash algorithm implementation pluggable (CASSANDRASC-114)
  * Fix ClosedChannelException when downloading from S3 (CASSANDRASC-112)
  * Fix NPE thrown when getting StorageClient from pool (CASSANDRASC-110)
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index ecd0734..4e942c7 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -107,7 +107,7 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
                                 slice.compressedSize() + 
slice.uncompressedSize(),
                                 requiredUsableSpacePercentage,
                                 executorPool)
-        .onSuccess(ignored -> {
+        .compose(v -> {
             RestoreJob job = slice.job();
             if (job.isManagedBySidecar())
             {
@@ -120,55 +120,63 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
                         slice.completeStagePhase(); // update the flag if 
missed
                         sliceDatabaseAccessor.updateStatus(slice);
                         event.tryComplete(slice);
-                        return;
+                        return Future.succeededFuture();
                     }
 
                     // 1. check object existence and validate eTag / checksum
-                    checkObjectExistence(event)
+                    CompletableFuture<Void> fut = checkObjectExistence(event)
                     // 2. download slice/object when the remote object exists
                     .thenCompose(headObject -> downloadSlice(event))
                     // 3. persist status
-                    .thenAccept(x -> {
+                    .thenAccept(file -> {
                         slice.completeStagePhase();
                         sliceDatabaseAccessor.updateStatus(slice);
                         // completed staging. A new task is produced when it 
comes to import
                         event.tryComplete(slice);
+                    })
+                    .whenComplete((x, cause) -> {
+                        if (cause != null)
+                        {
+                            // handle unexpected errors thrown during download 
slice call, that do not close event
+                            
event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause));
+                        }
                     });
+
+                    return Future.fromCompletionStage(fut);
                 }
                 else if (job.status == RestoreJobStatus.STAGED)
                 {
-                    unzipAndImport(event, slice.stagedObjectPath().toFile(),
-                                   // persist status
-                                   () -> 
sliceDatabaseAccessor.updateStatus(slice));
+                    return unzipAndImport(event, 
slice.stagedObjectPath().toFile(),
+                                          // persist status
+                                          () -> 
sliceDatabaseAccessor.updateStatus(slice));
                 }
                 else
                 {
                     String msg = "Unexpected restore job status. Expected only 
CREATED or STAGED when " +
                                  "processing active slices. Found status: " + 
job.status;
                     Exception unexpectedState = new IllegalStateException(msg);
-                    
event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status",
-                                                                    slice, 
unexpectedState));
+                    return 
Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Unexpected restore job 
status",
+                                                                               
  slice, unexpectedState));
                 }
             }
             else
             {
-                downloadSliceAndImport(event);
+                return downloadSliceAndImport(event);
             }
         })
-        .onFailure(cause -> {
-            String msg = "Unable to ensure enough space for the slice. Retry 
later";
-            event.tryFail(RestoreJobExceptions.ofSlice(msg, slice, cause));
-        });
+        .onSuccess(v -> event.tryComplete(slice))
+        .onFailure(cause -> 
event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause)));
     }
 
-    private void downloadSliceAndImport(Promise<RestoreSlice> event)
+    private Future<Void> downloadSliceAndImport(Promise<RestoreSlice> event)
     {
         // 1. check object existence and validate eTag / checksum
-        checkObjectExistence(event)
+        CompletableFuture<File> fut = checkObjectExistence(event)
         // 2. download slice/object when the remote object exists
-        .thenCompose(headObject -> downloadSlice(event))
+        .thenCompose(headObject -> downloadSlice(event));
         // 3. unzip the file and import/commit
-        .thenAccept(file -> unzipAndImport(event, file));
+        return Future.fromCompletionStage(fut)
+                     .compose(file -> unzipAndImport(event, file));
     }
 
     private CompletableFuture<?> checkObjectExistence(Promise<RestoreSlice> 
event)
@@ -281,21 +289,21 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
     }
 
     @VisibleForTesting
-    void unzipAndImport(Promise<RestoreSlice> event, File file)
+    Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file)
     {
-        unzipAndImport(event, file, null);
+        return unzipAndImport(event, file, null);
     }
 
-    void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable 
onSuccessCommit)
+    Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, 
Runnable onSuccessCommit)
     {
         if (file == null) // the condition should never happen. Having it here 
for logic completeness
         {
-            event.tryFail(RestoreJobExceptions.ofFatalSlice("Object not found 
from disk", slice, null));
-            return;
+            return 
Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Object not found from 
disk",
+                                                                         
slice, null));
         }
 
         // run the rest in the executor pool, instead of S3 client threadpool
-        unzip(file)
+        return unzip(file)
         .compose(this::validateFiles)
         .compose(this::commit)
         .compose(x -> {
@@ -304,7 +312,7 @@ public class RestoreSliceTask implements 
Handler<Promise<RestoreSlice>>
                 return Future.succeededFuture();
             }
 
-            return executorPool.executeBlocking(promise -> {
+            return executorPool.<Void>executeBlocking(promise -> {
                 onSuccessCommit.run();
                 promise.tryComplete();
             });
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
index 0174e12..f26e859 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java
@@ -70,8 +70,8 @@ public abstract class AsyncFileDigestVerifier<D extends 
Digest> implements Diges
                  .compose(computedDigest -> {
                      if (!computedDigest.equals(digest.value()))
                      {
-                         logger.error("Digest mismatch. computed_digest={}, 
expected_digest={}, algorithm=MD5",
-                                      computedDigest, digest.value());
+                         logger.error("Digest mismatch. computed_digest={}, 
expected_digest={}, algorithm={}",
+                                      computedDigest, digest.value(), 
digest.algorithm());
                          return Future.failedFuture(new 
HttpException(CHECKSUM_MISMATCH.code(),
                                                                       
String.format("Digest mismatch. "
                                                                                
     + "expected_digest=%s, "
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
index 728f5d2..6e9e1d7 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import com.datastax.driver.core.utils.UUIDs;
+import io.vertx.core.Future;
 import io.vertx.core.Promise;
 import io.vertx.core.Vertx;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
@@ -226,6 +227,81 @@ class RestoreSliceTaskTest
         assertThat(sliceDatabaseAccessor.updateInvokedTimes.get()).isOne();
     }
 
+    @Test
+    void testHandlingUnexpectedExceptionInObjectExistsCheck(@TempDir Path 
testFolder)
+    {
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.CREATED, "QUORUM");
+        when(mockStorageClient.objectExists(mockSlice)).thenThrow(new 
RuntimeException("Random exception"));
+        Path stagedPath = testFolder.resolve("slice.zip");
+        when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+
+        Promise<RestoreSlice> promise = Promise.promise();
+
+        RestoreSliceTask task = createTask(mockSlice, job);
+        task.handle(promise);
+
+        assertThatThrownBy(() -> getBlocking(promise.future()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Random exception");
+    }
+
+    @Test
+    void testHandlingUnexpectedExceptionDuringDownloadSliceCheck(@TempDir Path 
testFolder)
+    {
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.CREATED, "QUORUM");
+        Path stagedPath = testFolder.resolve("slice.zip");
+        when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+        when(mockSlice.isCancelled()).thenReturn(false);
+        
when(mockStorageClient.objectExists(mockSlice)).thenReturn(CompletableFuture.completedFuture(null));
+        
when(mockStorageClient.downloadObjectIfAbsent(mockSlice)).thenThrow(new 
RuntimeException("Random exception"));
+
+        Promise<RestoreSlice> promise = Promise.promise();
+
+        RestoreSliceTask task = createTask(mockSlice, job);
+        task.handle(promise);
+
+        assertThatThrownBy(() -> getBlocking(promise.future()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Random exception");
+    }
+
+    @Test
+    void testHandlingUnexpectedExceptionDuringUnzip(@TempDir Path testFolder) 
throws IOException
+    {
+
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.STAGED, "QUORUM");
+        Path stagedPath = testFolder.resolve("slice.zip");
+        Files.createFile(stagedPath);
+        when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+        Promise<RestoreSlice> promise = Promise.promise();
+        RestoreSliceTask task = createTaskWithExceptions(mockSlice, job);
+        task.handle(promise);
+
+        assertThatThrownBy(() -> getBlocking(promise.future()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Random exception");
+    }
+
+    @Test
+    void testHandlingUnexpectedExceptionDuringDownloadAndImport(@TempDir Path 
testFolder)
+    {
+        RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.CREATED, null);
+        Path stagedPath = testFolder.resolve("slice.zip");
+        when(mockSlice.stagedObjectPath()).thenReturn(stagedPath);
+        when(mockSlice.isCancelled()).thenReturn(false);
+        
when(mockStorageClient.objectExists(mockSlice)).thenReturn(CompletableFuture.completedFuture(null));
+        
when(mockStorageClient.downloadObjectIfAbsent(mockSlice)).thenReturn(CompletableFuture.completedFuture(null));
+
+        Promise<RestoreSlice> promise = Promise.promise();
+
+        RestoreSliceTask task = createTaskWithExceptions(mockSlice, job);
+        task.handle(promise);
+
+        assertThatThrownBy(() -> getBlocking(promise.future()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Random exception");
+    }
+
     private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job)
     {
         when(slice.job()).thenReturn(job);
@@ -236,6 +312,16 @@ class RestoreSliceTaskTest
                                         0, sliceDatabaseAccessor, stats);
     }
 
+    private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice, 
RestoreJob job)
+    {
+        when(slice.job()).thenReturn(job);
+        assertThat(slice.job()).isSameAs(job);
+        
assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
+        assertThat(slice.job().status).isEqualTo(job.status);
+        return new TestUnexpectedExceptionInRestoreSliceTask(slice, 
mockStorageClient, executorPool,
+                                                             
mockSSTableImporter, 0, sliceDatabaseAccessor, stats);
+    }
+
     static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor
     {
         public final AtomicInteger updateInvokedTimes = new AtomicInteger(0);
@@ -269,7 +355,7 @@ class RestoreSliceTaskTest
         }
 
         @Override
-        void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable 
onSuccessCommit)
+        Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, 
Runnable onSuccessCommit)
         {
             stats.captureSliceUnzipTime(1, 123L);
             stats.captureSliceValidationTime(1, 123L);
@@ -280,12 +366,32 @@ class RestoreSliceTaskTest
                 onSuccessCommit.run();
             }
             event.tryComplete(slice);
+            return Future.succeededFuture();
+        }
+
+        @Override
+        Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file)
+        {
+            return unzipAndImport(event, file, null);
+        }
+    }
+
+    static class TestUnexpectedExceptionInRestoreSliceTask extends 
RestoreSliceTask
+    {
+        public TestUnexpectedExceptionInRestoreSliceTask(RestoreSlice slice, 
StorageClient s3Client,
+                                                         TaskExecutorPool 
executorPool, SSTableImporter importer,
+                                                         double 
requiredUsableSpacePercentage,
+                                                         
RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
+                                                         RestoreJobStats stats)
+        {
+            super(slice, s3Client, executorPool, importer, 
requiredUsableSpacePercentage, sliceDatabaseAccessor, stats,
+                  null);
         }
 
         @Override
-        void unzipAndImport(Promise<RestoreSlice> event, File file)
+        Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, 
Runnable onSuccessCommit)
         {
-            unzipAndImport(event, file, null);
+            throw new RuntimeException("Random exception");
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to