This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 9b56cdd CASSANDRASC-105 RestoreSliceTask could be stuck due to missing exception handling (#107) 9b56cdd is described below commit 9b56cdd11a25550f0628e99ba35650c44663eb19 Author: Saranya Krishnakumar <sarany...@apple.com> AuthorDate: Mon Mar 18 16:49:57 2024 -0700 CASSANDRASC-105 RestoreSliceTask could be stuck due to missing exception handling (#107) patch by Saranya Krishnakumar; reviewed by Francisco Guerrero, Yifan Cai for CASSANDRASC-105 --- CHANGES.txt | 1 + .../sidecar/restore/RestoreSliceTask.java | 58 ++++++----- .../sidecar/utils/AsyncFileDigestVerifier.java | 4 +- .../sidecar/restore/RestoreSliceTaskTest.java | 112 ++++++++++++++++++++- 4 files changed, 145 insertions(+), 30 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0fdc914..e625cde 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * RestoreSliceTask could be stuck due to missing exception handling (CASSANDRASC-105) * Make hash algorithm implementation pluggable (CASSANDRASC-114) * Fix ClosedChannelException when downloading from S3 (CASSANDRASC-112) * Fix NPE thrown when getting StorageClient from pool (CASSANDRASC-110) diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java index ecd0734..4e942c7 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java @@ -107,7 +107,7 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> slice.compressedSize() + slice.uncompressedSize(), requiredUsableSpacePercentage, executorPool) - .onSuccess(ignored -> { + .compose(v -> { RestoreJob job = slice.job(); if (job.isManagedBySidecar()) { @@ -120,55 +120,63 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> slice.completeStagePhase(); // update the flag if missed sliceDatabaseAccessor.updateStatus(slice); event.tryComplete(slice); - return; + return Future.succeededFuture(); } // 1. check object existence and validate eTag / checksum - checkObjectExistence(event) + CompletableFuture<Void> fut = checkObjectExistence(event) // 2. download slice/object when the remote object exists .thenCompose(headObject -> downloadSlice(event)) // 3. persist status - .thenAccept(x -> { + .thenAccept(file -> { slice.completeStagePhase(); sliceDatabaseAccessor.updateStatus(slice); // completed staging. A new task is produced when it comes to import event.tryComplete(slice); + }) + .whenComplete((x, cause) -> { + if (cause != null) + { + // handle unexpected errors thrown during download slice call, that do not close event + event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause)); + } }); + + return Future.fromCompletionStage(fut); } else if (job.status == RestoreJobStatus.STAGED) { - unzipAndImport(event, slice.stagedObjectPath().toFile(), - // persist status - () -> sliceDatabaseAccessor.updateStatus(slice)); + return unzipAndImport(event, slice.stagedObjectPath().toFile(), + // persist status + () -> sliceDatabaseAccessor.updateStatus(slice)); } else { String msg = "Unexpected restore job status. Expected only CREATED or STAGED when " + "processing active slices. Found status: " + job.status; Exception unexpectedState = new IllegalStateException(msg); - event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status", - slice, unexpectedState)); + return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status", + slice, unexpectedState)); } } else { - downloadSliceAndImport(event); + return downloadSliceAndImport(event); } }) - .onFailure(cause -> { - String msg = "Unable to ensure enough space for the slice. Retry later"; - event.tryFail(RestoreJobExceptions.ofSlice(msg, slice, cause)); - }); + .onSuccess(v -> event.tryComplete(slice)) + .onFailure(cause -> event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause))); } - private void downloadSliceAndImport(Promise<RestoreSlice> event) + private Future<Void> downloadSliceAndImport(Promise<RestoreSlice> event) { // 1. check object existence and validate eTag / checksum - checkObjectExistence(event) + CompletableFuture<File> fut = checkObjectExistence(event) // 2. download slice/object when the remote object exists - .thenCompose(headObject -> downloadSlice(event)) + .thenCompose(headObject -> downloadSlice(event)); // 3. unzip the file and import/commit - .thenAccept(file -> unzipAndImport(event, file)); + return Future.fromCompletionStage(fut) + .compose(file -> unzipAndImport(event, file)); } private CompletableFuture<?> checkObjectExistence(Promise<RestoreSlice> event) @@ -281,21 +289,21 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> } @VisibleForTesting - void unzipAndImport(Promise<RestoreSlice> event, File file) + Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file) { - unzipAndImport(event, file, null); + return unzipAndImport(event, file, null); } - void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit) + Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit) { if (file == null) // the condition should never happen. Having it here for logic completeness { - event.tryFail(RestoreJobExceptions.ofFatalSlice("Object not found from disk", slice, null)); - return; + return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Object not found from disk", + slice, null)); } // run the rest in the executor pool, instead of S3 client threadpool - unzip(file) + return unzip(file) .compose(this::validateFiles) .compose(this::commit) .compose(x -> { @@ -304,7 +312,7 @@ public class RestoreSliceTask implements Handler<Promise<RestoreSlice>> return Future.succeededFuture(); } - return executorPool.executeBlocking(promise -> { + return executorPool.<Void>executeBlocking(promise -> { onSuccessCommit.run(); promise.tryComplete(); }); diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java index 0174e12..f26e859 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/AsyncFileDigestVerifier.java @@ -70,8 +70,8 @@ public abstract class AsyncFileDigestVerifier<D extends Digest> implements Diges .compose(computedDigest -> { if (!computedDigest.equals(digest.value())) { - logger.error("Digest mismatch. computed_digest={}, expected_digest={}, algorithm=MD5", - computedDigest, digest.value()); + logger.error("Digest mismatch. computed_digest={}, expected_digest={}, algorithm={}", + computedDigest, digest.value(), digest.algorithm()); return Future.failedFuture(new HttpException(CHECKSUM_MISMATCH.code(), String.format("Digest mismatch. " + "expected_digest=%s, " diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java index 728f5d2..6e9e1d7 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import com.datastax.driver.core.utils.UUIDs; +import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; @@ -226,6 +227,81 @@ class RestoreSliceTaskTest assertThat(sliceDatabaseAccessor.updateInvokedTimes.get()).isOne(); } + @Test + void testHandlingUnexpectedExceptionInObjectExistsCheck(@TempDir Path testFolder) + { + RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED, "QUORUM"); + when(mockStorageClient.objectExists(mockSlice)).thenThrow(new RuntimeException("Random exception")); + Path stagedPath = testFolder.resolve("slice.zip"); + when(mockSlice.stagedObjectPath()).thenReturn(stagedPath); + + Promise<RestoreSlice> promise = Promise.promise(); + + RestoreSliceTask task = createTask(mockSlice, job); + task.handle(promise); + + assertThatThrownBy(() -> getBlocking(promise.future())) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Random exception"); + } + + @Test + void testHandlingUnexpectedExceptionDuringDownloadSliceCheck(@TempDir Path testFolder) + { + RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED, "QUORUM"); + Path stagedPath = testFolder.resolve("slice.zip"); + when(mockSlice.stagedObjectPath()).thenReturn(stagedPath); + when(mockSlice.isCancelled()).thenReturn(false); + when(mockStorageClient.objectExists(mockSlice)).thenReturn(CompletableFuture.completedFuture(null)); + when(mockStorageClient.downloadObjectIfAbsent(mockSlice)).thenThrow(new RuntimeException("Random exception")); + + Promise<RestoreSlice> promise = Promise.promise(); + + RestoreSliceTask task = createTask(mockSlice, job); + task.handle(promise); + + assertThatThrownBy(() -> getBlocking(promise.future())) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Random exception"); + } + + @Test + void testHandlingUnexpectedExceptionDuringUnzip(@TempDir Path testFolder) throws IOException + { + + RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.STAGED, "QUORUM"); + Path stagedPath = testFolder.resolve("slice.zip"); + Files.createFile(stagedPath); + when(mockSlice.stagedObjectPath()).thenReturn(stagedPath); + Promise<RestoreSlice> promise = Promise.promise(); + RestoreSliceTask task = createTaskWithExceptions(mockSlice, job); + task.handle(promise); + + assertThatThrownBy(() -> getBlocking(promise.future())) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Random exception"); + } + + @Test + void testHandlingUnexpectedExceptionDuringDownloadAndImport(@TempDir Path testFolder) + { + RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED, null); + Path stagedPath = testFolder.resolve("slice.zip"); + when(mockSlice.stagedObjectPath()).thenReturn(stagedPath); + when(mockSlice.isCancelled()).thenReturn(false); + when(mockStorageClient.objectExists(mockSlice)).thenReturn(CompletableFuture.completedFuture(null)); + when(mockStorageClient.downloadObjectIfAbsent(mockSlice)).thenReturn(CompletableFuture.completedFuture(null)); + + Promise<RestoreSlice> promise = Promise.promise(); + + RestoreSliceTask task = createTaskWithExceptions(mockSlice, job); + task.handle(promise); + + assertThatThrownBy(() -> getBlocking(promise.future())) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Random exception"); + } + private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job) { when(slice.job()).thenReturn(job); @@ -236,6 +312,16 @@ class RestoreSliceTaskTest 0, sliceDatabaseAccessor, stats); } + private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice, RestoreJob job) + { + when(slice.job()).thenReturn(job); + assertThat(slice.job()).isSameAs(job); + assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar()); + assertThat(slice.job().status).isEqualTo(job.status); + return new TestUnexpectedExceptionInRestoreSliceTask(slice, mockStorageClient, executorPool, + mockSSTableImporter, 0, sliceDatabaseAccessor, stats); + } + static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor { public final AtomicInteger updateInvokedTimes = new AtomicInteger(0); @@ -269,7 +355,7 @@ class RestoreSliceTaskTest } @Override - void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit) + Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit) { stats.captureSliceUnzipTime(1, 123L); stats.captureSliceValidationTime(1, 123L); @@ -280,12 +366,32 @@ class RestoreSliceTaskTest onSuccessCommit.run(); } event.tryComplete(slice); + return Future.succeededFuture(); + } + + @Override + Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file) + { + return unzipAndImport(event, file, null); + } + } + + static class TestUnexpectedExceptionInRestoreSliceTask extends RestoreSliceTask + { + public TestUnexpectedExceptionInRestoreSliceTask(RestoreSlice slice, StorageClient s3Client, + TaskExecutorPool executorPool, SSTableImporter importer, + double requiredUsableSpacePercentage, + RestoreSliceDatabaseAccessor sliceDatabaseAccessor, + RestoreJobStats stats) + { + super(slice, s3Client, executorPool, importer, requiredUsableSpacePercentage, sliceDatabaseAccessor, stats, + null); } @Override - void unzipAndImport(Promise<RestoreSlice> event, File file) + Future<Void> unzipAndImport(Promise<RestoreSlice> event, File file, Runnable onSuccessCommit) { - unzipAndImport(event, file, null); + throw new RuntimeException("Random exception"); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org