This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 00fba404 CASSANDRASC-136: Add delay between reporting of the same slow 
task (#127)
00fba404 is described below

commit 00fba404a00282ec2117290fdf45658387a2b016
Author: Yifan Cai <[email protected]>
AuthorDate: Mon Jul 1 12:47:22 2024 -0700

    CASSANDRASC-136: Add delay between reporting of the same slow task (#127)
    
    Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRASC-136
---
 CHANGES.txt                                        |  1 +
 src/main/dist/conf/sidecar.yaml                    |  6 +-
 .../sidecar/config/RestoreJobConfiguration.java    |  9 ++-
 .../config/yaml/RestoreJobConfigurationImpl.java   | 66 +++++++++++-------
 .../sidecar/exceptions/ThrowableUtils.java         | 18 +++--
 .../sidecar/restore/RestoreProcessor.java          | 78 +++++++++++++---------
 .../sidecar/restore/RestoreSliceTask.java          |  6 +-
 .../cassandra/sidecar/restore/StorageClient.java   | 53 ++++++++-------
 .../org/apache/cassandra/sidecar/TestModule.java   |  3 +-
 .../sidecar/exceptions/ThrowableUtilsTest.java     |  4 +-
 .../sidecar/restore/RestoreProcessorTest.java      | 14 ++--
 11 files changed, 159 insertions(+), 99 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 2b556c3e..5d19ec6e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Add delay between reporting of the same slow task (CASSANDRASC-136)
  * Added additional JVM options to increase max open FD limit for MacOS 
(CASSANDRASC-135)
  * Detect out of range data and cleanup using nodetool (CASSANDRASC-134)
  * Allow optional reason to abort restore jobs (CASSANDRASC-133)
diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml
index ecd836c9..90cb61a7 100644
--- a/src/main/dist/conf/sidecar.yaml
+++ b/src/main/dist/conf/sidecar.yaml
@@ -172,11 +172,13 @@ cassandra_input_validation:
   allowed_chars_for_restricted_component_name: "[a-zA-Z0-9_-]+(.db|TOC.txt)"
 
 blob_restore:
-  job_discovery_active_loop_delay_millis: 5
-  job_discovery_idle_loop_delay_millis: 10
+  job_discovery_active_loop_delay_millis: 300000
+  job_discovery_idle_loop_delay_millis: 600000
   job_discovery_recency_days: 5
   slice_process_max_concurrency: 20
   restore_job_tables_ttl_seconds: 7776000
+  slow_task_threshold_seconds: 600
+  slow_task_report_delay_seconds: 60
 
 s3_client:
   concurrency: 4
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
 
b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
index ec591653..53f983f9 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
@@ -50,7 +50,12 @@ public interface RestoreJobConfiguration
     long restoreJobTablesTtlSeconds();
 
     /**
-     * @return the number of seconds above which a restore handler is 
considered "long-running"
+     * @return the number of seconds above which a restore task is considered 
slow
      */
-    long restoreJobLongRunningHandlerThresholdSeconds();
+    long slowTaskThresholdSeconds();
+
+    /**
+     * @return the delay in seconds between each report of the same slow task
+     */
+    long slowTaskReportDelaySeconds();
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
 
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
index 336340cb..23cf91c9 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
@@ -31,15 +31,15 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
 {
     private static final long MIN_RESTORE_JOB_TABLES_TTL_SECONDS = 
TimeUnit.DAYS.toSeconds(14);
 
-    public static final long DEFAULT_JOB_DISCOVERY_ACTIVE_LOOP_DELAY_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
-    public static final long DEFAULT_JOB_DISCOVERY_IDLE_LOOP_DELAY_MILLIS = 
TimeUnit.MINUTES.toMillis(10);
-    public static final int DEFAULT_JOB_DISCOVERY_RECENCY_DAYS = 5;
-    public static final int DEFAULT_PROCESS_MAX_CONCURRENCY = 20; // process 
at most 20 slices concurrently
-    public static final long DEFAULT_RESTORE_JOB_TABLES_TTL_SECONDS = 
TimeUnit.DAYS.toSeconds(90);
-    public static final String 
RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS =
-    "restore_job_long_running_threshold_seconds";
-    // A restore job handler is considered long-running if it has been in the 
"active" list for 10 minutes.
-    private static final long 
DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS = 600;
+    private static final long DEFAULT_JOB_DISCOVERY_ACTIVE_LOOP_DELAY_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
+    private static final long DEFAULT_JOB_DISCOVERY_IDLE_LOOP_DELAY_MILLIS = 
TimeUnit.MINUTES.toMillis(10);
+    private static final int DEFAULT_JOB_DISCOVERY_RECENCY_DAYS = 5;
+    private static final int DEFAULT_PROCESS_MAX_CONCURRENCY = 20; // process 
at most 20 slices concurrently
+    private static final long DEFAULT_RESTORE_JOB_TABLES_TTL_SECONDS = 
TimeUnit.DAYS.toSeconds(90);
+    // A restore task is considered slow if it has been in the "active" list 
for 10 minutes.
+    private static final long DEFAULT_RESTORE_JOB_SLOW_TASK_THRESHOLD_SECONDS 
= TimeUnit.MINUTES.toSeconds(10);
+    // report once a minute
+    private static final long 
DEFAULT_RESTORE_JOB_SLOW_TASK_REPORT_DELAY_SECONDS = 
TimeUnit.MINUTES.toSeconds(1);
 
     @JsonProperty(value = "job_discovery_active_loop_delay_millis")
     protected final long jobDiscoveryActiveLoopDelayMillis;
@@ -56,10 +56,11 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
     @JsonProperty(value = "restore_job_tables_ttl_seconds")
     protected final long restoreJobTablesTtlSeconds;
 
+    @JsonProperty(value = "slow_task_threshold_seconds")
+    protected final long slowTaskThresholdSeconds;
 
-    @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS,
-    defaultValue = DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS 
+ "")
-    private final long restoreJobLongRunningThresholdSeconds;
+    @JsonProperty(value = "slow_task_report_delay_seconds")
+    protected final long slowTaskReportDelaySeconds;
 
     protected RestoreJobConfigurationImpl()
     {
@@ -73,7 +74,8 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
         this.jobDiscoveryRecencyDays = builder.jobDiscoveryRecencyDays;
         this.processMaxConcurrency = builder.processMaxConcurrency;
         this.restoreJobTablesTtlSeconds = builder.restoreJobTablesTtlSeconds;
-        this.restoreJobLongRunningThresholdSeconds = 
builder.restoreJobLongRunningThresholdSeconds;
+        this.slowTaskThresholdSeconds = builder.slowTaskThresholdSeconds;
+        this.slowTaskReportDelaySeconds = builder.slowTaskReportDelaySeconds;
         validate();
     }
 
@@ -143,11 +145,17 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
     }
 
     @Override
-    @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS,
-                  defaultValue = 
DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS + "")
-    public long restoreJobLongRunningHandlerThresholdSeconds()
+    @JsonProperty(value = "slow_task_threshold_seconds")
+    public long slowTaskThresholdSeconds()
     {
-        return restoreJobLongRunningThresholdSeconds;
+        return slowTaskThresholdSeconds;
+    }
+
+    @Override
+    @JsonProperty(value = "slow_task_report_delay_seconds")
+    public long slowTaskReportDelaySeconds()
+    {
+        return slowTaskReportDelaySeconds;
     }
 
     public static Builder builder()
@@ -160,8 +168,8 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
      */
     public static class Builder implements DataObjectBuilder<Builder, 
RestoreJobConfigurationImpl>
     {
-        protected long restoreJobLongRunningThresholdSeconds =
-        DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS;
+        private long slowTaskThresholdSeconds = 
DEFAULT_RESTORE_JOB_SLOW_TASK_THRESHOLD_SECONDS;
+        private long slowTaskReportDelaySeconds = 
DEFAULT_RESTORE_JOB_SLOW_TASK_REPORT_DELAY_SECONDS;
         private long jobDiscoveryActiveLoopDelayMillis = 
DEFAULT_JOB_DISCOVERY_ACTIVE_LOOP_DELAY_MILLIS;
         private long jobDiscoveryIdleLoopDelayMillis = 
DEFAULT_JOB_DISCOVERY_IDLE_LOOP_DELAY_MILLIS;
         private int jobDiscoveryRecencyDays = 
DEFAULT_JOB_DISCOVERY_RECENCY_DAYS;
@@ -239,15 +247,27 @@ public class RestoreJobConfigurationImpl implements 
RestoreJobConfiguration
         }
 
         /**
-         * Sets the {@code restoreJobLongRunningThresholdSeconds} and returns 
a reference to this Builder enabling
+         * Sets the {@code slowTaskThresholdSeconds} and returns a reference 
to this Builder enabling
+         * method chaining.
+         *
+         * @param slowTaskThresholdSeconds the {@code 
slowTaskThresholdSeconds} to set
+         * @return a reference to this Builder
+         */
+        public Builder slowTaskThresholdSeconds(long slowTaskThresholdSeconds)
+        {
+            return update(b -> b.slowTaskThresholdSeconds = 
slowTaskThresholdSeconds);
+        }
+
+        /**
+         * Sets the {@code slowTaskReportDelaySeconds} and returns a reference 
to this Builder enabling
          * method chaining.
          *
-         * @param restoreJobLongRunningThresholdSeconds the {@code 
restoreJobLongRunningThresholdSeconds} to set
+         * @param slowTaskReportDelaySeconds the {@code 
slowTaskReportDelaySeconds} to set
          * @return a reference to this Builder
          */
-        public Builder restoreJobLongRunningThresholdSeconds(long 
restoreJobLongRunningThresholdSeconds)
+        public Builder slowTaskReportDelaySeconds(long 
slowTaskReportDelaySeconds)
         {
-            return update(b -> b.restoreJobLongRunningThresholdSeconds = 
restoreJobLongRunningThresholdSeconds);
+            return update(b -> b.slowTaskReportDelaySeconds = 
slowTaskReportDelaySeconds);
         }
 
         @Override
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java 
b/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java
index db00897d..fe2cd283 100644
--- a/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java
+++ b/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java
@@ -36,6 +36,7 @@ public class ThrowableUtils
     /**
      * Run the {@code actionMayThrow} and wrap any {@link Exception} thrown in 
{@link RuntimeException}
      * @param actionMayThrow action that may throw exceptions
+     * @return value of type R
      * @param <R> return value type of the action
      */
     public static <R> R propagate(Callable<R> actionMayThrow)
@@ -101,12 +102,19 @@ public class ThrowableUtils
                 return cause;
             }
 
-            fastTracer = getCause(fastTracer, 2);
-            if (cause == fastTracer && stop == null)
+            if (stop == null)
             {
-                // Mark the position to stop, and continue tracing the cause 
up until hitting stop the next time.
-                // This way we are sure that all exceptions/causes are visited 
at least once.
-                stop = cause;
+                // once stop is set; updating fast tracer is no longer required
+                if (cause == fastTracer)
+                {
+                    // Mark the position to stop, and continue tracing the 
cause up until hitting stop the next time.
+                    // This way we are sure that all exceptions/causes are 
visited at least once.
+                    stop = cause;
+                }
+                else
+                {
+                    fastTracer = getCause(fastTracer, 2);
+                }
             }
             cause = getCause(cause, 1);
         }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
index 36a5d770..35ba6a74 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.sidecar.restore;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -66,8 +65,11 @@ public class RestoreProcessor implements PeriodicTask
     private final double requiredUsableSpacePercentage; // value range: [0.0, 
1.0)
     private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
     private final RestoreJobUtil restoreJobUtil;
-    private final Set<RestoreSliceHandler> activeTasks = 
ConcurrentHashMap.newKeySet();
-    private final long longRunningHandlerThresholdInSeconds;
+    // mapping of task to the time when it should be reported as 'slow' if it 
is still active
+    // using concurrent data strucutre because the map is accessed from 
multiple threads
+    private final Map<RestoreSliceHandler, Long> activeTasks = new 
ConcurrentHashMap<>();
+    private final long slowTaskThresholdInSeconds;
+    private final long slowTaskReportDelayInSeconds;
     private final LocalTokenRangesProvider localTokenRangesProvider;
     private final SidecarMetrics metrics;
 
@@ -91,8 +93,8 @@ public class RestoreProcessor implements PeriodicTask
                                                                         
.processMaxConcurrency());
         this.requiredUsableSpacePercentage
         = 
config.serviceConfiguration().sstableUploadConfiguration().minimumSpacePercentageRequired()
 / 100.0;
-        this.longRunningHandlerThresholdInSeconds = 
config.restoreJobConfiguration()
-                                                          
.restoreJobLongRunningHandlerThresholdSeconds();
+        this.slowTaskThresholdInSeconds = 
config.restoreJobConfiguration().slowTaskThresholdSeconds();
+        this.slowTaskReportDelayInSeconds = 
config.restoreJobConfiguration().slowTaskReportDelaySeconds();
         this.importer = importer;
         this.sliceDatabaseAccessor = sliceDatabaseAccessor;
         this.restoreJobUtil = restoreJobUtil;
@@ -151,7 +153,8 @@ public class RestoreProcessor implements PeriodicTask
                                                          restoreJobUtil,
                                                          
localTokenRangesProvider,
                                                          metrics);
-            activeTasks.add(task);
+
+            activeTasks.put(task, slowTaskThresholdInSeconds);
             pool.executeBlocking(task, false) // unordered; run in parallel
             .onSuccess(restoreSlice -> {
                 if (slice.hasImported())
@@ -208,44 +211,53 @@ public class RestoreProcessor implements PeriodicTask
                 activeTasks.remove(task);
             });
         }
-        promise.tryComplete();
         checkForLongRunningTasks();
         sliceQueue.capturePendingSliceCount();
+        promise.tryComplete();
+    }
+
+    @Override
+    public void close()
+    {
+        isClosed = true;
+        s3ClientPool.close();
+        sliceQueue.close();
     }
 
     private void checkForLongRunningTasks()
     {
-        for (RestoreSliceHandler task : activeTasks)
+        for (RestoreSliceHandler t : activeTasks.keySet())
         {
-            long elapsedInNanos = task.elapsedInNanos();
-            if (elapsedInNanos == -1)
+            long elapsedInNanos = t.elapsedInNanos();
+            if (elapsedInNanos == -1) // not started
             {
                 continue;
             }
-            long elapsedInSeconds = TimeUnit.SECONDS.convert(elapsedInNanos, 
TimeUnit.NANOSECONDS);
-            if (elapsedInSeconds > longRunningHandlerThresholdInSeconds)
-            {
-                LOGGER.warn("Long-running restore slice task detected. " +
-                            "elapsedSeconds={} thresholdSeconds={} sliceKey={} 
jobId={} status={}",
-                            elapsedInSeconds,
-                            longRunningHandlerThresholdInSeconds,
-                            task.slice().key(),
-                            task.slice().jobId(),
-                            task.slice().job().status);
-                task.slice()
-                    .owner()
-                    .metrics()
-                    
.restore().slowRestoreTaskTime.metric.update(elapsedInNanos, 
TimeUnit.NANOSECONDS);
-            }
-        }
-    }
 
-    @Override
-    public void close()
-    {
-        isClosed = true;
-        s3ClientPool.close();
-        sliceQueue.close();
+            long elapsedInSeconds = 
TimeUnit.NANOSECONDS.toSeconds(elapsedInNanos);
+
+            // Read the current map and update the existing entries if needed.
+            // We do not want to put new entry to the map in this method.
+            activeTasks.computeIfPresent(t, (task, timeToReport) -> {
+                if (elapsedInSeconds > timeToReport)
+                {
+                    LOGGER.warn("Long-running restore slice task detected. " +
+                                "elapsedSeconds={} thresholdSeconds={} 
sliceKey={} jobId={} status={}",
+                                elapsedInSeconds,
+                                slowTaskThresholdInSeconds,
+                                task.slice().key(),
+                                task.slice().jobId(),
+                                task.slice().job().status);
+                    task.slice()
+                        .owner()
+                        .metrics()
+                        
.restore().slowRestoreTaskTime.metric.update(elapsedInNanos, 
TimeUnit.NANOSECONDS);
+                    return timeToReport + slowTaskReportDelayInSeconds; // 
increment by the delay
+                }
+
+                return timeToReport; // do not update
+            });
+        }
     }
 
     @VisibleForTesting
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index 697ad070..50cd2958 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -149,8 +149,7 @@ public class RestoreSliceTask implements RestoreSliceHandler
                 {
                     if (Files.exists(slice.stagedObjectPath()))
                     {
-                        LOGGER.debug("The slice has been staged already. 
sliceKey={} stagedFilePath={}",
-                                     slice.key(), slice.stagedObjectPath());
+                        LOGGER.info("The slice has been staged already. 
sliceKey={}", slice.key());
                         slice.completeStagePhase(); // update the flag if 
missed
                         sliceDatabaseAccessor.updateStatus(slice);
                         event.tryComplete(slice);
@@ -292,10 +291,11 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
         Future<File> future =
         fromCompletionStage(s3Client.downloadObjectIfAbsent(slice))
         .onFailure(cause -> {
+            LOGGER.warn("Failed to download restore slice. sliceKey={}", 
slice.key(), cause);
+
             slice.incrementDownloadAttempt();
             if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class) 
!= null)
             {
-                LOGGER.warn("Downloading restore slice times out. 
sliceKey={}", slice.key());
                 
instanceMetrics.restore().sliceDownloadTimeouts.metric.update(1);
             }
             event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable 
error when downloading object",
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index fd31c7cb..da639cf4 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -42,6 +42,7 @@ import 
org.apache.cassandra.sidecar.common.data.StorageCredentials;
 import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreSlice;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
+import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
 import software.amazon.awssdk.auth.credentials.AwsCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
@@ -97,7 +98,9 @@ public class StorageClient
                 return newCredentials;
             }
             else
+            {
                 return credentials;
+            }
         });
         return this;
     }
@@ -142,7 +145,7 @@ public class StorageClient
         Credentials credentials = credentialsProviders.get(slice.jobId());
         if (credentials == null)
         {
-            LOGGER.debug("Credentials to download object not found. jobId={}", 
slice.jobId());
+            LOGGER.warn("Credentials to download object not found. jobId={}", 
slice.jobId());
             CompletableFuture<File> failedFuture = new CompletableFuture<>();
             failedFuture.completeExceptionally(credentialsNotFound(slice));
             return failedFuture;
@@ -152,8 +155,8 @@ public class StorageClient
         File object = objectPath.toFile();
         if (object.exists())
         {
-            LOGGER.debug("Skipping download, file already exists. jobId={} 
s3_object={}",
-                         slice.jobId(), slice.stagedObjectPath());
+            LOGGER.info("Skipping download, file already exists. jobId={} 
sliceKey={}",
+                         slice.jobId(), slice.key());
             // Skip downloading if the file already exists on disk. It should 
be a rare scenario.
             // Note that the on-disk file could be different from the remote 
object, although the name matches.
             // TODO 1: verify etag does not change after s3 replication and 
batch copy
@@ -165,12 +168,12 @@ public class StorageClient
 
         if (!object.getParentFile().mkdirs())
         {
-            LOGGER.warn("Error occurred while creating directory. jobId={} 
s3Object={}",
-                        slice.jobId(), slice.stagedObjectPath());
+            LOGGER.warn("Error occurred while creating directory. jobId={} 
sliceKey={}",
+                        slice.jobId(), slice.key());
 
         }
 
-        LOGGER.info("Downloading object. jobId={} s3Object={}", slice.jobId(), 
slice.stagedObjectPath());
+        LOGGER.info("Downloading object. jobId={} sliceKey={}", slice.jobId(), 
slice.key());
         // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
         GetObjectRequest request =
         GetObjectRequest.builder()
@@ -274,31 +277,33 @@ public class StorageClient
         }
         catch (FileAlreadyExistsException fileAlreadyExistsException)
         {
-            LOGGER.debug("Skipping download, file already exists. jobId={} 
s3_object={}",
-                         slice.jobId(), slice.stagedObjectPath());
+            LOGGER.info("Skipping download. File already exists. jobId={} 
sliceKey={}",
+                         slice.jobId(), slice.key());
             return CompletableFuture.completedFuture(publisher.response());
         }
         catch (IOException e)
         {
-            LOGGER.error("Error occurred while creating channel. 
destinationPath={} jobId={} s3_object={}",
-                         destinationPath, slice.jobId(), 
slice.stagedObjectPath(), e);
+            LOGGER.error("Error occurred while creating channel. 
destinationPath={} jobId={} sliceKey={}",
+                         destinationPath, slice.jobId(), slice.key(), e);
             throw new RuntimeException(e);
         }
         // CompletableFuture that will be notified when all events have been 
consumed or if an error occurs.
-        CompletableFuture<Void> subscribeFuture = publisher.subscribe(buffer 
-> {
-            downloadRateLimiter.acquire(buffer.remaining()); // apply 
backpressure on the received bytes
-            try
-            {
-                channel.write(buffer);
-            }
-            catch (IOException e)
-            {
-                LOGGER.error("Error occurred while downloading. jobId={} 
s3_object={}",
-                             slice.jobId(), slice.stagedObjectPath(), e);
-                throw new RuntimeException(e);
-            }
-        }).whenComplete((v, subscribeThrowable) -> closeChannel(channel));
-        return subscribeFuture.thenApply(v -> publisher.response());
+        return publisher
+               .subscribe(buffer -> {
+                   downloadRateLimiter.acquire(buffer.remaining()); // apply 
backpressure on the received bytes
+                   ThrowableUtils.propagate(() -> channel.write(buffer));
+               })
+               .whenComplete((v, subscribeThrowable) -> {
+                   // finally close the channel and log error if failed
+                   closeChannel(channel);
+
+                   if (subscribeThrowable != null)
+                   {
+                       LOGGER.error("Error occurred while downloading. 
jobId={} sliceKey={}",
+                                    slice.jobId(), slice.key(), 
subscribeThrowable);
+                   }
+               })
+               .thenApply(v -> publisher.response());
     }
 
     /**
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java 
b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index f1f4621c..143dd139 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -123,7 +123,8 @@ public class TestModule extends AbstractModule
         RestoreJobConfigurationImpl.builder()
                                    
.restoreJobTablesTtlSeconds(TimeUnit.DAYS.toSeconds(14) + 1)
                                    
.processMaxConcurrency(RESTORE_MAX_CONCURRENCY)
-                                   .restoreJobLongRunningThresholdSeconds(1)
+                                   .slowTaskThresholdSeconds(10)
+                                   .slowTaskReportDelaySeconds(120)
                                    .build();
         HealthCheckConfiguration healthCheckConfiguration = new 
HealthCheckConfigurationImpl(200, 1000);
         return SidecarConfigurationImpl.builder()
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtilsTest.java 
b/src/test/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtilsTest.java
index 9b0cbdb7..fcf6915b 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtilsTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtilsTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.sidecar.exceptions;
 
+import java.io.IOException;
+
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -39,7 +41,7 @@ class ThrowableUtilsTest
     @Test
     void testGetCauseWithCircularRef()
     {
-        Exception root = new RuntimeException();
+        Exception root = new IOException();
         Exception testEx = new IllegalStateException(root);
         Exception ex = new RuntimeException(testEx);
         root.initCause(ex); // create a circular chain
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
index e878fa22..ac162554 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
@@ -163,7 +163,7 @@ class RestoreProcessorTest
     }
 
     @Test
-    public void testLongRunningHandlerDetection()
+    void testLongRunningHandlerDetection()
     {
         when(sidecarSchema.isInitialized()).thenReturn(true);
         periodicTaskExecutor.schedule(processor);
@@ -171,17 +171,21 @@ class RestoreProcessorTest
         CountDownLatch latch = new CountDownLatch(1);
         AtomicLong currentTime = new AtomicLong(0);
         RestoreSlice slice = mockSlowSlice(latch, currentTime::get); // Sets 
the start time
-        long fiveMinutesInNanos = TimeUnit.NANOSECONDS.convert(5, 
TimeUnit.MINUTES);
-        currentTime.set(fiveMinutesInNanos);
+        long oneMinutesInNanos = TimeUnit.NANOSECONDS.convert(1, 
TimeUnit.MINUTES);
+        currentTime.set(oneMinutesInNanos);
         processor.submit(slice);
         loopAssert(3, () -> {
             long[] slowRestoreTaskTimes = instanceMetrics()
                                           .restore()
                                           
.slowRestoreTaskTime.metric.getSnapshot().getValues();
-            assertThat(slowRestoreTaskTimes.length).isGreaterThanOrEqualTo(1);
+            assertThat(slowRestoreTaskTimes)
+            .describedAs("The task takes 1 minute. " +
+                         "The slow task threshodl is 10 seconds and report 
delay is 2 minutes (see TestModule). " +
+                         "It should only report once")
+            .hasSize(1);
             Long handlerTimeInNanos = slowRestoreTaskTimes[0];
             assertThat(handlerTimeInNanos).isNotNull();
-            assertThat(handlerTimeInNanos).isEqualTo(fiveMinutesInNanos);
+            assertThat(handlerTimeInNanos).isEqualTo(oneMinutesInNanos);
             assertThat(processor.activeTasks()).isOne();
         });
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to