This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 00fba404 CASSANDRASC-136: Add delay between reporting of the same slow
task (#127)
00fba404 is described below
commit 00fba404a00282ec2117290fdf45658387a2b016
Author: Yifan Cai <[email protected]>
AuthorDate: Mon Jul 1 12:47:22 2024 -0700
CASSANDRASC-136: Add delay between reporting of the same slow task (#127)
Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRASC-136
---
CHANGES.txt | 1 +
src/main/dist/conf/sidecar.yaml | 6 +-
.../sidecar/config/RestoreJobConfiguration.java | 9 ++-
.../config/yaml/RestoreJobConfigurationImpl.java | 66 +++++++++++-------
.../sidecar/exceptions/ThrowableUtils.java | 18 +++--
.../sidecar/restore/RestoreProcessor.java | 78 +++++++++++++---------
.../sidecar/restore/RestoreSliceTask.java | 6 +-
.../cassandra/sidecar/restore/StorageClient.java | 53 ++++++++-------
.../org/apache/cassandra/sidecar/TestModule.java | 3 +-
.../sidecar/exceptions/ThrowableUtilsTest.java | 4 +-
.../sidecar/restore/RestoreProcessorTest.java | 14 ++--
11 files changed, 159 insertions(+), 99 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b556c3e..5d19ec6e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Add delay between reporting of the same slow task (CASSANDRASC-136)
* Added additional JVM options to increase max open FD limit for MacOS
(CASSANDRASC-135)
* Detect out of range data and cleanup using nodetool (CASSANDRASC-134)
* Allow optional reason to abort restore jobs (CASSANDRASC-133)
diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml
index ecd836c9..90cb61a7 100644
--- a/src/main/dist/conf/sidecar.yaml
+++ b/src/main/dist/conf/sidecar.yaml
@@ -172,11 +172,13 @@ cassandra_input_validation:
allowed_chars_for_restricted_component_name: "[a-zA-Z0-9_-]+(.db|TOC.txt)"
blob_restore:
- job_discovery_active_loop_delay_millis: 5
- job_discovery_idle_loop_delay_millis: 10
+ job_discovery_active_loop_delay_millis: 300000
+ job_discovery_idle_loop_delay_millis: 600000
job_discovery_recency_days: 5
slice_process_max_concurrency: 20
restore_job_tables_ttl_seconds: 7776000
+ slow_task_threshold_seconds: 600
+ slow_task_report_delay_seconds: 60
s3_client:
concurrency: 4
diff --git
a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
index ec591653..53f983f9 100644
---
a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
+++
b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
@@ -50,7 +50,12 @@ public interface RestoreJobConfiguration
long restoreJobTablesTtlSeconds();
/**
- * @return the number of seconds above which a restore handler is
considered "long-running"
+ * @return the number of seconds above which a restore task is considered
slow
*/
- long restoreJobLongRunningHandlerThresholdSeconds();
+ long slowTaskThresholdSeconds();
+
+ /**
+ * @return the delay in seconds between each report of the same slow task
+ */
+ long slowTaskReportDelaySeconds();
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
index 336340cb..23cf91c9 100644
---
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
+++
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
@@ -31,15 +31,15 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
{
private static final long MIN_RESTORE_JOB_TABLES_TTL_SECONDS =
TimeUnit.DAYS.toSeconds(14);
- public static final long DEFAULT_JOB_DISCOVERY_ACTIVE_LOOP_DELAY_MILLIS =
TimeUnit.MINUTES.toMillis(5);
- public static final long DEFAULT_JOB_DISCOVERY_IDLE_LOOP_DELAY_MILLIS =
TimeUnit.MINUTES.toMillis(10);
- public static final int DEFAULT_JOB_DISCOVERY_RECENCY_DAYS = 5;
- public static final int DEFAULT_PROCESS_MAX_CONCURRENCY = 20; // process
at most 20 slices concurrently
- public static final long DEFAULT_RESTORE_JOB_TABLES_TTL_SECONDS =
TimeUnit.DAYS.toSeconds(90);
- public static final String
RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS =
- "restore_job_long_running_threshold_seconds";
- // A restore job handler is considered long-running if it has been in the
"active" list for 10 minutes.
- private static final long
DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS = 600;
+ private static final long DEFAULT_JOB_DISCOVERY_ACTIVE_LOOP_DELAY_MILLIS =
TimeUnit.MINUTES.toMillis(5);
+ private static final long DEFAULT_JOB_DISCOVERY_IDLE_LOOP_DELAY_MILLIS =
TimeUnit.MINUTES.toMillis(10);
+ private static final int DEFAULT_JOB_DISCOVERY_RECENCY_DAYS = 5;
+ private static final int DEFAULT_PROCESS_MAX_CONCURRENCY = 20; // process
at most 20 slices concurrently
+ private static final long DEFAULT_RESTORE_JOB_TABLES_TTL_SECONDS =
TimeUnit.DAYS.toSeconds(90);
+ // A restore task is considered slow if it has been in the "active" list
for 10 minutes.
+ private static final long DEFAULT_RESTORE_JOB_SLOW_TASK_THRESHOLD_SECONDS
= TimeUnit.MINUTES.toSeconds(10);
+ // report once a minute
+ private static final long
DEFAULT_RESTORE_JOB_SLOW_TASK_REPORT_DELAY_SECONDS =
TimeUnit.MINUTES.toSeconds(1);
@JsonProperty(value = "job_discovery_active_loop_delay_millis")
protected final long jobDiscoveryActiveLoopDelayMillis;
@@ -56,10 +56,11 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
@JsonProperty(value = "restore_job_tables_ttl_seconds")
protected final long restoreJobTablesTtlSeconds;
+ @JsonProperty(value = "slow_task_threshold_seconds")
+ protected final long slowTaskThresholdSeconds;
- @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS,
- defaultValue = DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS
+ "")
- private final long restoreJobLongRunningThresholdSeconds;
+ @JsonProperty(value = "slow_task_report_delay_seconds")
+ protected final long slowTaskReportDelaySeconds;
protected RestoreJobConfigurationImpl()
{
@@ -73,7 +74,8 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
this.jobDiscoveryRecencyDays = builder.jobDiscoveryRecencyDays;
this.processMaxConcurrency = builder.processMaxConcurrency;
this.restoreJobTablesTtlSeconds = builder.restoreJobTablesTtlSeconds;
- this.restoreJobLongRunningThresholdSeconds =
builder.restoreJobLongRunningThresholdSeconds;
+ this.slowTaskThresholdSeconds = builder.slowTaskThresholdSeconds;
+ this.slowTaskReportDelaySeconds = builder.slowTaskReportDelaySeconds;
validate();
}
@@ -143,11 +145,17 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
}
@Override
- @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS,
- defaultValue =
DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS + "")
- public long restoreJobLongRunningHandlerThresholdSeconds()
+ @JsonProperty(value = "slow_task_threshold_seconds")
+ public long slowTaskThresholdSeconds()
{
- return restoreJobLongRunningThresholdSeconds;
+ return slowTaskThresholdSeconds;
+ }
+
+ @Override
+ @JsonProperty(value = "slow_task_report_delay_seconds")
+ public long slowTaskReportDelaySeconds()
+ {
+ return slowTaskReportDelaySeconds;
}
public static Builder builder()
@@ -160,8 +168,8 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
*/
public static class Builder implements DataObjectBuilder<Builder,
RestoreJobConfigurationImpl>
{
- protected long restoreJobLongRunningThresholdSeconds =
- DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS;
+ private long slowTaskThresholdSeconds =
DEFAULT_RESTORE_JOB_SLOW_TASK_THRESHOLD_SECONDS;
+ private long slowTaskReportDelaySeconds =
DEFAULT_RESTORE_JOB_SLOW_TASK_REPORT_DELAY_SECONDS;
private long jobDiscoveryActiveLoopDelayMillis =
DEFAULT_JOB_DISCOVERY_ACTIVE_LOOP_DELAY_MILLIS;
private long jobDiscoveryIdleLoopDelayMillis =
DEFAULT_JOB_DISCOVERY_IDLE_LOOP_DELAY_MILLIS;
private int jobDiscoveryRecencyDays =
DEFAULT_JOB_DISCOVERY_RECENCY_DAYS;
@@ -239,15 +247,27 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
}
/**
- * Sets the {@code restoreJobLongRunningThresholdSeconds} and returns
a reference to this Builder enabling
+ * Sets the {@code slowTaskThresholdSeconds} and returns a reference
to this Builder enabling
+ * method chaining.
+ *
+ * @param slowTaskThresholdSeconds the {@code
slowTaskThresholdSeconds} to set
+ * @return a reference to this Builder
+ */
+ public Builder slowTaskThresholdSeconds(long slowTaskThresholdSeconds)
+ {
+ return update(b -> b.slowTaskThresholdSeconds =
slowTaskThresholdSeconds);
+ }
+
+ /**
+ * Sets the {@code slowTaskReportDelaySeconds} and returns a reference
to this Builder enabling
* method chaining.
*
- * @param restoreJobLongRunningThresholdSeconds the {@code
restoreJobLongRunningThresholdSeconds} to set
+ * @param slowTaskReportDelaySeconds the {@code
slowTaskReportDelaySeconds} to set
* @return a reference to this Builder
*/
- public Builder restoreJobLongRunningThresholdSeconds(long
restoreJobLongRunningThresholdSeconds)
+ public Builder slowTaskReportDelaySeconds(long
slowTaskReportDelaySeconds)
{
- return update(b -> b.restoreJobLongRunningThresholdSeconds =
restoreJobLongRunningThresholdSeconds);
+ return update(b -> b.slowTaskReportDelaySeconds =
slowTaskReportDelaySeconds);
}
@Override
diff --git
a/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java
b/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java
index db00897d..fe2cd283 100644
--- a/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java
+++ b/src/main/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtils.java
@@ -36,6 +36,7 @@ public class ThrowableUtils
/**
* Run the {@code actionMayThrow} and wrap any {@link Exception} thrown in
{@link RuntimeException}
* @param actionMayThrow action that may throw exceptions
+ * @return value of type R
* @param <R> return value type of the action
*/
public static <R> R propagate(Callable<R> actionMayThrow)
@@ -101,12 +102,19 @@ public class ThrowableUtils
return cause;
}
- fastTracer = getCause(fastTracer, 2);
- if (cause == fastTracer && stop == null)
+ if (stop == null)
{
- // Mark the position to stop, and continue tracing the cause
up until hitting stop the next time.
- // This way we are sure that all exceptions/causes are visited
at least once.
- stop = cause;
+ // once stop is set; updating fast tracer is no longer required
+ if (cause == fastTracer)
+ {
+ // Mark the position to stop, and continue tracing the
cause up until hitting stop the next time.
+ // This way we are sure that all exceptions/causes are
visited at least once.
+ stop = cause;
+ }
+ else
+ {
+ fastTracer = getCause(fastTracer, 2);
+ }
}
cause = getCause(cause, 1);
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
index 36a5d770..35ba6a74 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.sidecar.restore;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -66,8 +65,11 @@ public class RestoreProcessor implements PeriodicTask
private final double requiredUsableSpacePercentage; // value range: [0.0,
1.0)
private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
private final RestoreJobUtil restoreJobUtil;
- private final Set<RestoreSliceHandler> activeTasks =
ConcurrentHashMap.newKeySet();
- private final long longRunningHandlerThresholdInSeconds;
+ // mapping of task to the time when it should be reported as 'slow' if it
is still active
+ // using concurrent data strucutre because the map is accessed from
multiple threads
+ private final Map<RestoreSliceHandler, Long> activeTasks = new
ConcurrentHashMap<>();
+ private final long slowTaskThresholdInSeconds;
+ private final long slowTaskReportDelayInSeconds;
private final LocalTokenRangesProvider localTokenRangesProvider;
private final SidecarMetrics metrics;
@@ -91,8 +93,8 @@ public class RestoreProcessor implements PeriodicTask
.processMaxConcurrency());
this.requiredUsableSpacePercentage
=
config.serviceConfiguration().sstableUploadConfiguration().minimumSpacePercentageRequired()
/ 100.0;
- this.longRunningHandlerThresholdInSeconds =
config.restoreJobConfiguration()
-
.restoreJobLongRunningHandlerThresholdSeconds();
+ this.slowTaskThresholdInSeconds =
config.restoreJobConfiguration().slowTaskThresholdSeconds();
+ this.slowTaskReportDelayInSeconds =
config.restoreJobConfiguration().slowTaskReportDelaySeconds();
this.importer = importer;
this.sliceDatabaseAccessor = sliceDatabaseAccessor;
this.restoreJobUtil = restoreJobUtil;
@@ -151,7 +153,8 @@ public class RestoreProcessor implements PeriodicTask
restoreJobUtil,
localTokenRangesProvider,
metrics);
- activeTasks.add(task);
+
+ activeTasks.put(task, slowTaskThresholdInSeconds);
pool.executeBlocking(task, false) // unordered; run in parallel
.onSuccess(restoreSlice -> {
if (slice.hasImported())
@@ -208,44 +211,53 @@ public class RestoreProcessor implements PeriodicTask
activeTasks.remove(task);
});
}
- promise.tryComplete();
checkForLongRunningTasks();
sliceQueue.capturePendingSliceCount();
+ promise.tryComplete();
+ }
+
+ @Override
+ public void close()
+ {
+ isClosed = true;
+ s3ClientPool.close();
+ sliceQueue.close();
}
private void checkForLongRunningTasks()
{
- for (RestoreSliceHandler task : activeTasks)
+ for (RestoreSliceHandler t : activeTasks.keySet())
{
- long elapsedInNanos = task.elapsedInNanos();
- if (elapsedInNanos == -1)
+ long elapsedInNanos = t.elapsedInNanos();
+ if (elapsedInNanos == -1) // not started
{
continue;
}
- long elapsedInSeconds = TimeUnit.SECONDS.convert(elapsedInNanos,
TimeUnit.NANOSECONDS);
- if (elapsedInSeconds > longRunningHandlerThresholdInSeconds)
- {
- LOGGER.warn("Long-running restore slice task detected. " +
- "elapsedSeconds={} thresholdSeconds={} sliceKey={}
jobId={} status={}",
- elapsedInSeconds,
- longRunningHandlerThresholdInSeconds,
- task.slice().key(),
- task.slice().jobId(),
- task.slice().job().status);
- task.slice()
- .owner()
- .metrics()
-
.restore().slowRestoreTaskTime.metric.update(elapsedInNanos,
TimeUnit.NANOSECONDS);
- }
- }
- }
- @Override
- public void close()
- {
- isClosed = true;
- s3ClientPool.close();
- sliceQueue.close();
+ long elapsedInSeconds =
TimeUnit.NANOSECONDS.toSeconds(elapsedInNanos);
+
+ // Read the current map and update the existing entries if needed.
+ // We do not want to put new entry to the map in this method.
+ activeTasks.computeIfPresent(t, (task, timeToReport) -> {
+ if (elapsedInSeconds > timeToReport)
+ {
+ LOGGER.warn("Long-running restore slice task detected. " +
+ "elapsedSeconds={} thresholdSeconds={}
sliceKey={} jobId={} status={}",
+ elapsedInSeconds,
+ slowTaskThresholdInSeconds,
+ task.slice().key(),
+ task.slice().jobId(),
+ task.slice().job().status);
+ task.slice()
+ .owner()
+ .metrics()
+
.restore().slowRestoreTaskTime.metric.update(elapsedInNanos,
TimeUnit.NANOSECONDS);
+ return timeToReport + slowTaskReportDelayInSeconds; //
increment by the delay
+ }
+
+ return timeToReport; // do not update
+ });
+ }
}
@VisibleForTesting
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index 697ad070..50cd2958 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -149,8 +149,7 @@ public class RestoreSliceTask implements RestoreSliceHandler
{
if (Files.exists(slice.stagedObjectPath()))
{
- LOGGER.debug("The slice has been staged already.
sliceKey={} stagedFilePath={}",
- slice.key(), slice.stagedObjectPath());
+ LOGGER.info("The slice has been staged already.
sliceKey={}", slice.key());
slice.completeStagePhase(); // update the flag if
missed
sliceDatabaseAccessor.updateStatus(slice);
event.tryComplete(slice);
@@ -292,10 +291,11 @@ public class RestoreSliceTask implements
RestoreSliceHandler
Future<File> future =
fromCompletionStage(s3Client.downloadObjectIfAbsent(slice))
.onFailure(cause -> {
+ LOGGER.warn("Failed to download restore slice. sliceKey={}",
slice.key(), cause);
+
slice.incrementDownloadAttempt();
if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class)
!= null)
{
- LOGGER.warn("Downloading restore slice times out.
sliceKey={}", slice.key());
instanceMetrics.restore().sliceDownloadTimeouts.metric.update(1);
}
event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable
error when downloading object",
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index fd31c7cb..da639cf4 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -42,6 +42,7 @@ import
org.apache.cassandra.sidecar.common.data.StorageCredentials;
import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreSlice;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
+import org.apache.cassandra.sidecar.exceptions.ThrowableUtils;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
@@ -97,7 +98,9 @@ public class StorageClient
return newCredentials;
}
else
+ {
return credentials;
+ }
});
return this;
}
@@ -142,7 +145,7 @@ public class StorageClient
Credentials credentials = credentialsProviders.get(slice.jobId());
if (credentials == null)
{
- LOGGER.debug("Credentials to download object not found. jobId={}",
slice.jobId());
+ LOGGER.warn("Credentials to download object not found. jobId={}",
slice.jobId());
CompletableFuture<File> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(credentialsNotFound(slice));
return failedFuture;
@@ -152,8 +155,8 @@ public class StorageClient
File object = objectPath.toFile();
if (object.exists())
{
- LOGGER.debug("Skipping download, file already exists. jobId={}
s3_object={}",
- slice.jobId(), slice.stagedObjectPath());
+ LOGGER.info("Skipping download, file already exists. jobId={}
sliceKey={}",
+ slice.jobId(), slice.key());
// Skip downloading if the file already exists on disk. It should
be a rare scenario.
// Note that the on-disk file could be different from the remote
object, although the name matches.
// TODO 1: verify etag does not change after s3 replication and
batch copy
@@ -165,12 +168,12 @@ public class StorageClient
if (!object.getParentFile().mkdirs())
{
- LOGGER.warn("Error occurred while creating directory. jobId={}
s3Object={}",
- slice.jobId(), slice.stagedObjectPath());
+ LOGGER.warn("Error occurred while creating directory. jobId={}
sliceKey={}",
+ slice.jobId(), slice.key());
}
- LOGGER.info("Downloading object. jobId={} s3Object={}", slice.jobId(),
slice.stagedObjectPath());
+ LOGGER.info("Downloading object. jobId={} sliceKey={}", slice.jobId(),
slice.key());
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
GetObjectRequest request =
GetObjectRequest.builder()
@@ -274,31 +277,33 @@ public class StorageClient
}
catch (FileAlreadyExistsException fileAlreadyExistsException)
{
- LOGGER.debug("Skipping download, file already exists. jobId={}
s3_object={}",
- slice.jobId(), slice.stagedObjectPath());
+ LOGGER.info("Skipping download. File already exists. jobId={}
sliceKey={}",
+ slice.jobId(), slice.key());
return CompletableFuture.completedFuture(publisher.response());
}
catch (IOException e)
{
- LOGGER.error("Error occurred while creating channel.
destinationPath={} jobId={} s3_object={}",
- destinationPath, slice.jobId(),
slice.stagedObjectPath(), e);
+ LOGGER.error("Error occurred while creating channel.
destinationPath={} jobId={} sliceKey={}",
+ destinationPath, slice.jobId(), slice.key(), e);
throw new RuntimeException(e);
}
// CompletableFuture that will be notified when all events have been
consumed or if an error occurs.
- CompletableFuture<Void> subscribeFuture = publisher.subscribe(buffer
-> {
- downloadRateLimiter.acquire(buffer.remaining()); // apply
backpressure on the received bytes
- try
- {
- channel.write(buffer);
- }
- catch (IOException e)
- {
- LOGGER.error("Error occurred while downloading. jobId={}
s3_object={}",
- slice.jobId(), slice.stagedObjectPath(), e);
- throw new RuntimeException(e);
- }
- }).whenComplete((v, subscribeThrowable) -> closeChannel(channel));
- return subscribeFuture.thenApply(v -> publisher.response());
+ return publisher
+ .subscribe(buffer -> {
+ downloadRateLimiter.acquire(buffer.remaining()); // apply
backpressure on the received bytes
+ ThrowableUtils.propagate(() -> channel.write(buffer));
+ })
+ .whenComplete((v, subscribeThrowable) -> {
+ // finally close the channel and log error if failed
+ closeChannel(channel);
+
+ if (subscribeThrowable != null)
+ {
+ LOGGER.error("Error occurred while downloading.
jobId={} sliceKey={}",
+ slice.jobId(), slice.key(),
subscribeThrowable);
+ }
+ })
+ .thenApply(v -> publisher.response());
}
/**
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index f1f4621c..143dd139 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -123,7 +123,8 @@ public class TestModule extends AbstractModule
RestoreJobConfigurationImpl.builder()
.restoreJobTablesTtlSeconds(TimeUnit.DAYS.toSeconds(14) + 1)
.processMaxConcurrency(RESTORE_MAX_CONCURRENCY)
- .restoreJobLongRunningThresholdSeconds(1)
+ .slowTaskThresholdSeconds(10)
+ .slowTaskReportDelaySeconds(120)
.build();
HealthCheckConfiguration healthCheckConfiguration = new
HealthCheckConfigurationImpl(200, 1000);
return SidecarConfigurationImpl.builder()
diff --git
a/src/test/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtilsTest.java
b/src/test/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtilsTest.java
index 9b0cbdb7..fcf6915b 100644
---
a/src/test/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtilsTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/exceptions/ThrowableUtilsTest.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.sidecar.exceptions;
+import java.io.IOException;
+
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -39,7 +41,7 @@ class ThrowableUtilsTest
@Test
void testGetCauseWithCircularRef()
{
- Exception root = new RuntimeException();
+ Exception root = new IOException();
Exception testEx = new IllegalStateException(root);
Exception ex = new RuntimeException(testEx);
root.initCause(ex); // create a circular chain
diff --git
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
index e878fa22..ac162554 100644
---
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
@@ -163,7 +163,7 @@ class RestoreProcessorTest
}
@Test
- public void testLongRunningHandlerDetection()
+ void testLongRunningHandlerDetection()
{
when(sidecarSchema.isInitialized()).thenReturn(true);
periodicTaskExecutor.schedule(processor);
@@ -171,17 +171,21 @@ class RestoreProcessorTest
CountDownLatch latch = new CountDownLatch(1);
AtomicLong currentTime = new AtomicLong(0);
RestoreSlice slice = mockSlowSlice(latch, currentTime::get); // Sets
the start time
- long fiveMinutesInNanos = TimeUnit.NANOSECONDS.convert(5,
TimeUnit.MINUTES);
- currentTime.set(fiveMinutesInNanos);
+ long oneMinutesInNanos = TimeUnit.NANOSECONDS.convert(1,
TimeUnit.MINUTES);
+ currentTime.set(oneMinutesInNanos);
processor.submit(slice);
loopAssert(3, () -> {
long[] slowRestoreTaskTimes = instanceMetrics()
.restore()
.slowRestoreTaskTime.metric.getSnapshot().getValues();
- assertThat(slowRestoreTaskTimes.length).isGreaterThanOrEqualTo(1);
+ assertThat(slowRestoreTaskTimes)
+ .describedAs("The task takes 1 minute. " +
+ "The slow task threshodl is 10 seconds and report
delay is 2 minutes (see TestModule). " +
+ "It should only report once")
+ .hasSize(1);
Long handlerTimeInNanos = slowRestoreTaskTimes[0];
assertThat(handlerTimeInNanos).isNotNull();
- assertThat(handlerTimeInNanos).isEqualTo(fiveMinutesInNanos);
+ assertThat(handlerTimeInNanos).isEqualTo(oneMinutesInNanos);
assertThat(processor.activeTasks()).isOne();
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]