This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new c824d11 CASSANDRASC-106: Add restore task watcher to report long
running tasks (#104)
c824d11 is described below
commit c824d112de2c92d180a90a1830108d225f45dc23
Author: Doug Rohrer <[email protected]>
AuthorDate: Thu Mar 21 13:27:31 2024 -0400
CASSANDRASC-106: Add restore task watcher to report long running tasks
(#104)
Patch by Doug Rohrer; Reviewed by Yifan Cai, Francisco Guerrero for
CASSANDRASC-106
---
CHANGES.txt | 3 +-
checkstyle.xml | 4 +-
.../cassandra/sidecar/client/RequestExecutor.java | 1 +
.../selection/OrderedInstanceSelectionPolicy.java | 1 +
.../sidecar/cluster/CQLSessionProviderImpl.java | 1 +
.../sidecar/cluster/CassandraAdapterDelegate.java | 2 +
.../sidecar/config/RestoreJobConfiguration.java | 5 ++
.../config/yaml/DriverConfigurationImpl.java | 3 +
.../sidecar/config/yaml/JmxConfigurationImpl.java | 2 +
.../config/yaml/RestoreJobConfigurationImpl.java | 32 ++++++++++
.../apache/cassandra/sidecar/db/RestoreSlice.java | 29 +++++----
.../sidecar/db/schema/RestoreSlicesSchema.java | 1 +
.../cassandra/sidecar/restore/RestoreJobUtil.java | 10 ++-
.../sidecar/restore/RestoreProcessor.java | 56 ++++++++++++++---
.../sidecar/restore/RestoreSliceHandler.java} | 28 +++++----
.../sidecar/restore/RestoreSliceTask.java | 66 +++++++++++++++++++-
.../cassandra/sidecar/stats/RestoreJobStats.java | 23 ++++++-
.../cluster/SidecarLoadBalancingPolicyTest.java | 1 +
.../sidecar/testing/IntegrationTestModule.java | 2 +
.../testing/SharedExecutorNettyOptions.java | 3 +
.../cassandra/testing/SimpleCassandraVersion.java | 1 +
.../cassandra/sidecar/HealthServiceSslTest.java | 1 +
.../cassandra/sidecar/HealthServiceTest.java | 1 +
.../org/apache/cassandra/sidecar/TestModule.java | 1 +
.../sidecar/restore/RestoreProcessorTest.java | 72 +++++++++++++++++++++-
.../sidecar/restore/RestoreSliceTaskTest.java | 38 +++++++++---
.../sstableuploads/BaseUploadsHandlerTest.java | 1 +
.../sidecar/stats/TestRestoreJobStats.java | 9 +++
28 files changed, 343 insertions(+), 54 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index e625cde..ce39b66 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Add restore task watcher to report long running tasks (CASSANDRASC-106)
* RestoreSliceTask could be stuck due to missing exception handling
(CASSANDRASC-105)
* Make hash algorithm implementation pluggable (CASSANDRASC-114)
* Fix ClosedChannelException when downloading from S3 (CASSANDRASC-112)
@@ -81,4 +82,4 @@
* Add integration tests task (CASSANDRA-15031)
* Add support for SSL and bindable address (CASSANDRA-15030)
* Autogenerate API docs for sidecar (CASSANDRA-15028)
- * C* Management process (CASSANDRA-14395)
+ * C* Management process (CASSANDRA-14395)
\ No newline at end of file
diff --git a/checkstyle.xml b/checkstyle.xml
index fd7c1b0..cea9a21 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -244,8 +244,8 @@ page at http://checkstyle.sourceforge.net/config.html -->
<module name="LineLength">
<!-- Checks if a line is too long. -->
- <property name="max" value="120" default="120" />
- <property name="severity" value="error" />
+ <property name="max" value="160" />
+ <property name="severity" value="warning" />
<!--
The default ignore pattern exempts the following elements:
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
index 0dd9e52..3480dca 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestExecutor.java
@@ -155,6 +155,7 @@ public class RequestExecutor implements AutoCloseable
/**
* Closes the underlying HTTP client
*/
+ @Override
public void close() throws Exception
{
httpClient.close();
diff --git
a/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java
b/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java
index 62df635..ecdca71 100644
---
a/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java
+++
b/client/src/main/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicy.java
@@ -47,6 +47,7 @@ public class OrderedInstanceSelectionPolicy implements
InstanceSelectionPolicy
*
* @return an iterator of {@link SidecarInstance instances}
*/
+ @Override
@NotNull
public Iterator<SidecarInstance> iterator()
{
diff --git
a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
index f394ca2..6bc30a0 100644
---
a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
+++
b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
@@ -119,6 +119,7 @@ public class CQLSessionProviderImpl implements
CQLSessionProvider
*
* @return Session
*/
+ @Override
@Nullable
public synchronized Session get()
{
diff --git
a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
index 7bc3a64..8ce81e2 100644
---
a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
+++
b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
@@ -373,11 +373,13 @@ public class CassandraAdapterDelegate implements
ICassandraAdapter, Host.StateLi
return nodeSettingsFromJmx;
}
+ @Override
public ResultSet executeLocal(Statement statement)
{
return fromAdapter(adapter -> adapter.executeLocal(statement));
}
+ @Override
public InetSocketAddress localNativeTransportPort()
{
return fromAdapter(ICassandraAdapter::localNativeTransportPort);
diff --git
a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
index 3231f17..ec59165 100644
---
a/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
+++
b/src/main/java/org/apache/cassandra/sidecar/config/RestoreJobConfiguration.java
@@ -48,4 +48,9 @@ public interface RestoreJobConfiguration
* @return time to live for restore job tables: restore_job and
restore_slice
*/
long restoreJobTablesTtlSeconds();
+
+ /**
+ * @return the number of seconds above which a restore handler is
considered "long-running"
+ */
+ long restoreJobLongRunningHandlerThresholdSeconds();
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
index 49334ed..c424728 100644
---
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
+++
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
@@ -34,18 +34,21 @@ public class DriverConfigurationImpl implements
DriverConfiguration
private String localDc;
private int numConnections;
+ @Override
@JsonProperty("contact_points")
public List<InetSocketAddress> contactPoints()
{
return contactPoints;
}
+ @Override
@JsonProperty("num_connections")
public int numConnections()
{
return numConnections;
}
+ @Override
@JsonProperty("local_dc")
public String localDc()
{
diff --git
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java
index 022d4ec..8349011 100644
---
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java
+++
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/JmxConfigurationImpl.java
@@ -47,6 +47,7 @@ public class JmxConfigurationImpl implements JmxConfiguration
/**
* @return the maximum number of connection retry attempts to make before
failing
*/
+ @Override
@JsonProperty("max_retries")
public int maxRetries()
{
@@ -56,6 +57,7 @@ public class JmxConfigurationImpl implements JmxConfiguration
/**
* @return the delay, in milliseconds, between retry attempts
*/
+ @Override
@JsonProperty("retry_delay_millis")
public long retryDelayMillis()
{
diff --git
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
index 3475f91..336340c 100644
---
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
+++
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
@@ -36,6 +36,10 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
public static final int DEFAULT_JOB_DISCOVERY_RECENCY_DAYS = 5;
public static final int DEFAULT_PROCESS_MAX_CONCURRENCY = 20; // process
at most 20 slices concurrently
public static final long DEFAULT_RESTORE_JOB_TABLES_TTL_SECONDS =
TimeUnit.DAYS.toSeconds(90);
+ public static final String
RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS =
+ "restore_job_long_running_threshold_seconds";
+ // A restore job handler is considered long-running if it has been in the
"active" list for 10 minutes.
+ private static final long
DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS = 600;
@JsonProperty(value = "job_discovery_active_loop_delay_millis")
protected final long jobDiscoveryActiveLoopDelayMillis;
@@ -52,6 +56,11 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
@JsonProperty(value = "restore_job_tables_ttl_seconds")
protected final long restoreJobTablesTtlSeconds;
+
+ @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS,
+ defaultValue = DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS
+ "")
+ private final long restoreJobLongRunningThresholdSeconds;
+
protected RestoreJobConfigurationImpl()
{
this(builder());
@@ -64,6 +73,7 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
this.jobDiscoveryRecencyDays = builder.jobDiscoveryRecencyDays;
this.processMaxConcurrency = builder.processMaxConcurrency;
this.restoreJobTablesTtlSeconds = builder.restoreJobTablesTtlSeconds;
+ this.restoreJobLongRunningThresholdSeconds =
builder.restoreJobLongRunningThresholdSeconds;
validate();
}
@@ -132,6 +142,14 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
return restoreJobTablesTtlSeconds;
}
+ @Override
+ @JsonProperty(value = RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS,
+ defaultValue =
DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS + "")
+ public long restoreJobLongRunningHandlerThresholdSeconds()
+ {
+ return restoreJobLongRunningThresholdSeconds;
+ }
+
public static Builder builder()
{
return new Builder();
@@ -142,6 +160,8 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
*/
public static class Builder implements DataObjectBuilder<Builder,
RestoreJobConfigurationImpl>
{
+ protected long restoreJobLongRunningThresholdSeconds =
+ DEFAULT_RESTORE_JOB_LONG_RUNNING_HANDLER_THRESHOLD_SECONDS;
private long jobDiscoveryActiveLoopDelayMillis =
DEFAULT_JOB_DISCOVERY_ACTIVE_LOOP_DELAY_MILLIS;
private long jobDiscoveryIdleLoopDelayMillis =
DEFAULT_JOB_DISCOVERY_IDLE_LOOP_DELAY_MILLIS;
private int jobDiscoveryRecencyDays =
DEFAULT_JOB_DISCOVERY_RECENCY_DAYS;
@@ -218,6 +238,18 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
return update(b -> b.restoreJobTablesTtlSeconds =
restoreJobTablesTtlSeconds);
}
+ /**
+ * Sets the {@code restoreJobLongRunningThresholdSeconds} and returns
a reference to this Builder enabling
+ * method chaining.
+ *
+ * @param restoreJobLongRunningThresholdSeconds the {@code
restoreJobLongRunningThresholdSeconds} to set
+ * @return a reference to this Builder
+ */
+ public Builder restoreJobLongRunningThresholdSeconds(long
restoreJobLongRunningThresholdSeconds)
+ {
+ return update(b -> b.restoreJobLongRunningThresholdSeconds =
restoreJobLongRunningThresholdSeconds);
+ }
+
@Override
public RestoreJobConfigurationImpl build()
{
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
index 7563e93..c4c7100 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
@@ -29,8 +29,6 @@ import java.util.Set;
import java.util.UUID;
import com.datastax.driver.core.Row;
-import io.vertx.core.Handler;
-import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.DataObjectBuilder;
import org.apache.cassandra.sidecar.common.data.CreateSliceRequestPayload;
@@ -40,6 +38,7 @@ import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import org.apache.cassandra.sidecar.restore.RestoreJobUtil;
+import org.apache.cassandra.sidecar.restore.RestoreSliceHandler;
import org.apache.cassandra.sidecar.restore.RestoreSliceTask;
import org.apache.cassandra.sidecar.restore.RestoreSliceTracker;
import org.apache.cassandra.sidecar.restore.StorageClient;
@@ -229,17 +228,17 @@ public class RestoreSlice
/**
* @return {@link RestoreSliceTask} of the restore slice. See {@link
RestoreSliceTask} for the steps.
*/
- public Handler<Promise<RestoreSlice>> toAsyncTask(StorageClientPool
s3ClientPool,
-
ExecutorPools.TaskExecutorPool executorPool,
- SSTableImporter importer,
- double
requiredUsableSpacePercentage,
-
RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
- RestoreJobStats stats,
- RestoreJobUtil
restoreJobUtil)
+ public RestoreSliceHandler toAsyncTask(StorageClientPool s3ClientPool,
+ ExecutorPools.TaskExecutorPool
executorPool,
+ SSTableImporter importer,
+ double
requiredUsableSpacePercentage,
+ RestoreSliceDatabaseAccessor
sliceDatabaseAccessor,
+ RestoreJobStats stats,
+ RestoreJobUtil restoreJobUtil)
{
if (isCancelled)
- return promise ->
promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
-
this, null));
+ return
RestoreSliceTask.failed(RestoreJobExceptions.ofFatalSlice("Restore slice is
cancelled",
+
this, null), this);
try
{
@@ -254,13 +253,13 @@ public class RestoreSlice
catch (IllegalStateException illegalState)
{
// The slice is not registered with a tracker, retry later.
- return promise ->
promise.tryFail(RestoreJobExceptions.ofSlice("Restore slice is not started",
-
this, illegalState));
+ return
RestoreSliceTask.failed(RestoreJobExceptions.ofSlice("Restore slice is not
started",
+
this, illegalState), this);
}
catch (Exception cause)
{
- return promise ->
promise.tryFail(RestoreJobExceptions.ofFatalSlice("Restore slice is failed",
-
this, cause));
+ return
RestoreSliceTask.failed(RestoreJobExceptions.ofFatalSlice("Restore slice is
failed",
+
this, cause), this);
}
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
index 0602a9e..3c9e2f1 100644
---
a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
+++
b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
@@ -48,6 +48,7 @@ public class RestoreSlicesSchema extends
AbstractSchema.TableSchema
this.tableTtlSeconds = tableTtlSeconds;
}
+ @Override
protected void prepareStatements(@NotNull Session session)
{
insertSlice = prepare(insertSlice, session,
CqlLiterals.insertSlice(keyspaceConfig));
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index ef36316..8651d53 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -59,7 +59,7 @@ public class RestoreJobUtil
private static final int RESTORE_JOB_PREFIX_LEN =
RESTORE_JOB_PREFIX.length();
private static final int RESTORE_JOB_DEFAULT_HASH_SEED = 0;
- private DigestAlgorithmProvider digestAlgorithmProvider;
+ private final DigestAlgorithmProvider digestAlgorithmProvider;
@Inject
public RestoreJobUtil(@Named("xxhash32") DigestAlgorithmProvider
digestAlgorithmProvider)
@@ -223,4 +223,12 @@ public class RestoreJobUtil
}
}
}
+
+ /**
+ * @return the current time in nanoseconds
+ */
+ public long currentTimeNanos()
+ {
+ return System.nanoTime();
+ }
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
index 1a438dc..edded10 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
@@ -21,8 +21,10 @@ package org.apache.cassandra.sidecar.restore;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
@@ -62,6 +64,8 @@ public class RestoreProcessor implements PeriodicTask
private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
private final RestoreJobStats stats;
private final RestoreJobUtil restoreJobUtil;
+ private final Set<RestoreSliceHandler> activeTasks =
ConcurrentHashMap.newKeySet();
+ private final long longRunningHandlerThresholdInSeconds;
private volatile boolean isClosed = false; // OK to run close twice, so
relax the control to volatile
@@ -82,6 +86,8 @@ public class RestoreProcessor implements PeriodicTask
.processMaxConcurrency());
this.requiredUsableSpacePercentage
=
config.serviceConfiguration().ssTableUploadConfiguration().minimumSpacePercentageRequired()
/ 100.0;
+ this.longRunningHandlerThresholdInSeconds =
config.restoreJobConfiguration()
+
.restoreJobLongRunningHandlerThresholdSeconds();
this.importer = importer;
this.sliceDatabaseAccessor = sliceDatabaseAccessor;
this.stats = stats;
@@ -128,26 +134,28 @@ public class RestoreProcessor implements PeriodicTask
if (slice == null) // it should never happen, and is only to make
ide happy
{
processMaxConcurrency.releasePermit();
- return;
+ break;
}
// capture the new queue length after polling
sliceQueue.captureImportQueueLength();
- pool.executeBlocking(slice.toAsyncTask(s3ClientPool, pool,
importer,
-
requiredUsableSpacePercentage,
- sliceDatabaseAccessor,
stats,
- restoreJobUtil),
- false) // unordered
+ RestoreSliceHandler task = slice.toAsyncTask(s3ClientPool, pool,
importer,
+
requiredUsableSpacePercentage,
+
sliceDatabaseAccessor, stats,
+ restoreJobUtil);
+ activeTasks.add(task);
+ pool.executeBlocking(task, false) // unordered; run in parallel
.onSuccess(restoreSlice -> {
+ int instanceId = slice.owner().id();
if (slice.hasImported())
{
- stats.captureSliceCompletionTime(slice.owner().id(),
System.nanoTime() - slice.creationTimeNanos());
+ stats.captureSliceCompletionTime(instanceId,
System.nanoTime() - slice.creationTimeNanos());
LOGGER.info("Slice completes successfully. sliceKey={}",
slice.key());
slice.complete();
}
else if (slice.hasStaged())
{
- // todo: report stat of time taken to stage
+ stats.captureSliceStageTime(instanceId,
task.elapsedInNanos());
LOGGER.info("Slice has been staged successfully.
sliceKey={}", slice.key());
// the slice is not fully complete yet. Re-enqueue the
slice.
sliceQueue.offer(slice);
@@ -186,12 +194,38 @@ public class RestoreProcessor implements PeriodicTask
// decrement the active slices and capture the new queue length
sliceQueue.decrementActiveSliceCount(slice);
sliceQueue.captureImportQueueLength();
+ activeTasks.remove(task);
});
}
promise.tryComplete();
+ checkForLongRunningTasks();
sliceQueue.capturePendingSliceCount();
}
+ private void checkForLongRunningTasks()
+ {
+ for (RestoreSliceHandler task : activeTasks)
+ {
+ long elapsedInNanos = task.elapsedInNanos();
+ if (elapsedInNanos == -1)
+ {
+ continue;
+ }
+ long elapsedInSeconds = TimeUnit.SECONDS.convert(elapsedInNanos,
TimeUnit.NANOSECONDS);
+ if (elapsedInSeconds > longRunningHandlerThresholdInSeconds)
+ {
+ LOGGER.warn("Long-running restore slice task detected. " +
+ "elapsedSeconds={} thresholdSeconds={} sliceKey={}
jobId={} status={}",
+ elapsedInSeconds,
+ longRunningHandlerThresholdInSeconds,
+ task.slice().key(),
+ task.slice().jobId(),
+ task.slice().job().status);
+
stats.captureLongRunningRestoreHandler(task.slice().owner().id(),
elapsedInNanos);
+ }
+ }
+ }
+
@Override
public void close()
{
@@ -206,6 +240,12 @@ public class RestoreProcessor implements PeriodicTask
return sliceQueue.activeSliceCount();
}
+ @VisibleForTesting
+ int activeTasks()
+ {
+ return activeTasks.size();
+ }
+
@VisibleForTesting
int pendingStartSlices()
{
diff --git a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java
similarity index 62%
copy from src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
copy to
src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java
index ef088db..fdcf390 100644
--- a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
+++
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceHandler.java
@@ -16,22 +16,24 @@
* limitations under the License.
*/
-package org.apache.cassandra.sidecar;
+package org.apache.cassandra.sidecar.restore;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import io.vertx.junit5.VertxExtension;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.db.RestoreSlice;
/**
- * Health Service Tests
+ * A handler that processes a restore slice
*/
-@DisplayName("Health Service Test")
-@ExtendWith(VertxExtension.class)
-public class HealthServiceTest extends AbstractHealthServiceTest
+public interface RestoreSliceHandler extends Handler<Promise<RestoreSlice>>
{
- public boolean isSslEnabled()
- {
- return false;
- }
+ /**
+ * @return slice the handler processes
+ */
+ RestoreSlice slice();
+
+ /**
+ * @return the elapsed time in nanoseconds if the task has started
processing, -1 otherwise
+ */
+ long elapsedInNanos();
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index 4e942c7..ed85d3f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.vertx.core.Future;
-import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.ext.web.handler.HttpException;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
@@ -59,7 +58,7 @@ import static
org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSuff
*
* Note that the class is package private, and it is not intended to be
referenced by other packages.
*/
-public class RestoreSliceTask implements Handler<Promise<RestoreSlice>>
+public class RestoreSliceTask implements RestoreSliceHandler
{
private static final Logger LOGGER =
LoggerFactory.getLogger(RestoreSliceTask.class);
@@ -71,6 +70,7 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
private final RestoreJobStats stats;
private final RestoreJobUtil restoreJobUtil;
+ private long taskStartTimeNanos = -1;
public RestoreSliceTask(RestoreSlice slice,
StorageClient s3Client,
@@ -94,9 +94,15 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
this.restoreJobUtil = restoreJobUtil;
}
+ public static RestoreSliceHandler failed(RestoreJobException cause,
RestoreSlice slice)
+ {
+ return new Failed(cause, slice);
+ }
+
@Override
public void handle(Promise<RestoreSlice> event)
{
+ this.taskStartTimeNanos = restoreJobUtil.currentTimeNanos();
if (failOnCancelled(event))
return;
@@ -192,7 +198,7 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
.whenComplete((resp, cause) -> {
if (cause == null)
{
- stats.captureSliceReplicationTime(System.nanoTime() -
slice.creationTimeNanos());
+ stats.captureSliceReplicationTime(currentTimeInNanos() -
slice.creationTimeNanos());
slice.setExistsOnS3();
return;
}
@@ -244,6 +250,11 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
});
}
+ private long currentTimeInNanos()
+ {
+ return restoreJobUtil.currentTimeNanos();
+ }
+
private CompletableFuture<File> downloadSlice(Promise<RestoreSlice> event)
{
if (slice.isCancelled())
@@ -521,4 +532,53 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
LOGGER.warn("Committing slice failed with HttpException. slice={}
statusCode={} exceptionPayload={}",
slice.sliceId(), httpException.getStatusCode(),
httpException.getPayload(), httpException);
}
+
+ @Override
+ public long elapsedInNanos()
+ {
+ return taskStartTimeNanos == -1 ? -1 :
+ currentTimeInNanos() - taskStartTimeNanos;
+ }
+
+ @Override
+ public RestoreSlice slice()
+ {
+ return slice;
+ }
+
+ /**
+ * A RestoreSliceHandler that immediately fails the slice/promise.
+ * Used when the processor already knows that a slice should not be
processed for some reason
+ * as indicated in cause field.
+ */
+ public static class Failed implements RestoreSliceHandler
+ {
+ private final RestoreJobException cause;
+ private final RestoreSlice slice;
+
+ public Failed(RestoreJobException cause, RestoreSlice slice)
+ {
+ this.cause = cause;
+ this.slice = slice;
+ }
+
+ @Override
+ public void handle(Promise<RestoreSlice> promise)
+ {
+ promise.tryFail(cause);
+ }
+
+ @Override
+ public long elapsedInNanos()
+ {
+ // it fails immediately
+ return 0;
+ }
+
+ @Override
+ public RestoreSlice slice()
+ {
+ return slice;
+ }
+ }
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java
b/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java
index 1a44bd8..27ca5fe 100644
--- a/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java
+++ b/src/main/java/org/apache/cassandra/sidecar/stats/RestoreJobStats.java
@@ -26,7 +26,7 @@ public interface RestoreJobStats
/**
* Captures the total time taken to complete a slice successfully
*
- * @param instanceId instance that contains the slice
+ * @param instanceId instance that is processing the slice
* @param durationNanos duration in nanoseconds
*/
default void captureSliceCompletionTime(int instanceId, long durationNanos)
@@ -34,6 +34,17 @@ public interface RestoreJobStats
}
+ /**
+ * Captures the total time taken to stage the slice
+ *
+ * @param instanceId instance that contains the slice
+ * @param durationNanos duration in nanoseconds
+ */
+ default void captureSliceStageTime(int instanceId, long durationNanos)
+ {
+
+ }
+
/**
* Captures the time taken to import SSTable(s) in a slice
*
@@ -217,4 +228,14 @@ public interface RestoreJobStats
{
}
+
+ /**
+ * Captures a long-running restore job handler
+ * @param instanceId instance that is processing the slice
+ * @param handlerDuration restore job current duration in nanoseconds
+ */
+ default void captureLongRunningRestoreHandler(int instanceId, long
handlerDuration)
+ {
+
+ }
}
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
b/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
index 4628a14..5b23031 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
@@ -55,6 +55,7 @@ public class SidecarLoadBalancingPolicyTest extends
IntegrationTestBase
.collect(Collectors.toList());
}
+ @Override
protected int getNumInstancesToManage(int clusterSize)
{
return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first
2 instances in the "cluster"
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
index ef76ffa..d0c069d 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
@@ -118,6 +118,7 @@ public class IntegrationTestModule extends AbstractModule
* @return instance meta information
* @throws NoSuchElementException when the instance with {@code id}
does not exist
*/
+ @Override
public InstanceMetadata instanceFromId(int id) throws
NoSuchElementException
{
return cassandraTestContext.instancesConfig().instanceFromId(id);
@@ -130,6 +131,7 @@ public class IntegrationTestModule extends AbstractModule
* @return instance meta information
* @throws NoSuchElementException when the instance for {@code host}
does not exist
*/
+ @Override
public InstanceMetadata instanceFromHost(String host) throws
NoSuchElementException
{
return
cassandraTestContext.instancesConfig().instanceFromHost(host);
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
b/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
index 3bdeead..2dbf616 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
@@ -48,11 +48,13 @@ class SharedExecutorNettyOptions extends NettyOptions
private final HashedWheelTimer sharedHWT = new
HashedWheelTimer(threadFactory);
private final EventLoopGroup sharedEventLoopGroup = new
NioEventLoopGroup(0, threadFactory);
+ @Override
public EventLoopGroup eventLoopGroup(ThreadFactory threadFactory)
{
return sharedEventLoopGroup;
}
+ @Override
public void onClusterClose(EventLoopGroup eventLoopGroup)
{
}
@@ -63,6 +65,7 @@ class SharedExecutorNettyOptions extends NettyOptions
return sharedHWT;
}
+ @Override
public void onClusterClose(Timer timer)
{
}
diff --git
a/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
b/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
index d7cd14b..0e90ae8 100644
---
a/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
+++
b/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
@@ -100,6 +100,7 @@ public class SimpleCassandraVersion implements
Comparable<SimpleCassandraVersion
}
+ @Override
public int compareTo(SimpleCassandraVersion other)
{
if (major < other.major)
diff --git
a/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java
b/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java
index 641ba55..2939d45 100644
--- a/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/HealthServiceSslTest.java
@@ -30,6 +30,7 @@ import io.vertx.junit5.VertxExtension;
@ExtendWith(VertxExtension.class)
public class HealthServiceSslTest extends AbstractHealthServiceTest
{
+ @Override
public boolean isSslEnabled()
{
return true;
diff --git a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
b/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
index ef088db..ac4a942 100644
--- a/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/HealthServiceTest.java
@@ -30,6 +30,7 @@ import io.vertx.junit5.VertxExtension;
@ExtendWith(VertxExtension.class)
public class HealthServiceTest extends AbstractHealthServiceTest
{
+ @Override
public boolean isSslEnabled()
{
return false;
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 518fc70..68bb4d8 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -112,6 +112,7 @@ public class TestModule extends AbstractModule
RestoreJobConfigurationImpl.builder()
.restoreJobTablesTtlSeconds(TimeUnit.DAYS.toSeconds(14) + 1)
.processMaxConcurrency(RESTORE_MAX_CONCURRENCY)
+ .restoreJobLongRunningThresholdSeconds(1)
.build();
HealthCheckConfiguration healthCheckConfiguration = new
HealthCheckConfigurationImpl(200, 1000);
return SidecarConfigurationImpl.builder()
diff --git
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
index c3fafc8..beea547 100644
---
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
@@ -19,6 +19,9 @@
package org.apache.cassandra.sidecar.restore;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.BeforeEach;
@@ -28,7 +31,10 @@ import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.util.Modules;
+import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreSlice;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import org.apache.cassandra.sidecar.server.MainModule;
@@ -170,14 +176,74 @@ class RestoreProcessorTest
});
}
+ @Test
+ public void testLongRunningHandlerDetection()
+ {
+
+ when(sidecarSchema.isInitialized()).thenReturn(true);
+ periodicTaskExecutor.schedule(processor);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicLong currentTime = new AtomicLong(0);
+ RestoreSlice slice = mockSlowSlice(latch, currentTime::get); // Sets
the start time
+ long fiveMinutesInNanos = TimeUnit.NANOSECONDS.convert(5,
TimeUnit.MINUTES);
+ currentTime.set(fiveMinutesInNanos);
+ processor.submit(slice);
+ loopAssert(3, () -> {
+ assertThat(stats.longRunningRestoreHandlers.size()).isEqualTo(1);
+ Long handlerTimeInNanos =
stats.longRunningRestoreHandlers.get(slice.owner().id());
+ assertThat(handlerTimeInNanos).isNotNull();
+ assertThat(handlerTimeInNanos).isEqualTo(fiveMinutesInNanos);
+ assertThat(processor.activeTasks()).isOne();
+ });
+
+ // Make slice completable.
+ latch.countDown();
+
+ // Make sure when the slice completes the active handler is removed
+ loopAssert(3, () -> {
+ assertThat(processor.activeTasks()).isZero();
+ });
+ }
+
private RestoreSlice mockSlowSlice(CountDownLatch latch)
+ {
+ return mockSlowSlice(latch, System::nanoTime);
+ }
+
+ private RestoreSlice mockSlowSlice(CountDownLatch latch, Supplier<Long>
timeInNanosSupplier)
{
RestoreSlice slice = mock(RestoreSlice.class,
Mockito.RETURNS_DEEP_STUBS);
when(slice.jobId()).thenReturn(UUIDs.timeBased());
when(slice.owner().id()).thenReturn(1);
- when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(),
any())).thenReturn(promise -> {
- Uninterruptibles.awaitUninterruptibly(latch);
- promise.complete(slice);
+ when(slice.key()).thenReturn("SliceKey");
+ RestoreJob job = RestoreJob.builder()
+ .jobStatus(RestoreJobStatus.CREATED)
+ .build();
+ when(slice.job()).thenReturn(job);
+ when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(), any(),
any())).thenReturn(
+ new RestoreSliceHandler()
+ {
+ private Long startTime = timeInNanosSupplier.get();
+
+ @Override
+ public void handle(Promise<RestoreSlice> promise)
+ {
+ Uninterruptibles.awaitUninterruptibly(latch);
+ promise.complete(slice);
+ }
+
+ @Override
+ public long elapsedInNanos()
+ {
+ return timeInNanosSupplier.get() - startTime;
+ }
+
+ @Override
+ public RestoreSlice slice()
+ {
+ return slice;
+ }
});
when(slice.hasImported()).thenReturn(true);
return slice;
diff --git
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
index 6e9e1d7..dcfd6a1 100644
---
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
@@ -25,6 +25,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -67,6 +69,7 @@ class RestoreSliceTaskTest
private TaskExecutorPool executorPool;
private TestRestoreJobStats stats;
private TestRestoreSliceAccessor sliceDatabaseAccessor;
+ private RestoreJobUtil util;
@BeforeEach
void setup()
@@ -82,6 +85,7 @@ class RestoreSliceTaskTest
mockSSTableImporter = mock(SSTableImporter.class);
executorPool = new ExecutorPools(Vertx.vertx(), new
ServiceConfigurationImpl()).internal();
stats = new TestRestoreJobStats();
+ util = mock(RestoreJobUtil.class);
sliceDatabaseAccessor = new TestRestoreSliceAccessor();
}
@@ -302,14 +306,33 @@ class RestoreSliceTaskTest
.hasMessageContaining("Random exception");
}
+ @Test
+ void testSliceDuration()
+ {
+ RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(),
RestoreJobStatus.STAGED, "QUORUM");
+ AtomicLong currentNanos = new AtomicLong(0);
+ RestoreSliceTask task = createTask(mockSlice, job, currentNanos::get);
+ Promise<RestoreSlice> promise = Promise.promise();
+ task.handle(promise); // Task isn't considered started until it
`handle` is called
+ currentNanos.set(123L);
+ assertThat(task.elapsedInNanos()).isEqualTo(123L);
+ }
+
private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job)
+ {
+ return createTask(slice, job, System::nanoTime);
+ }
+
+ private RestoreSliceTask createTask(RestoreSlice slice, RestoreJob job,
Supplier<Long> currentNanoTimeSupplier)
{
when(slice.job()).thenReturn(job);
assertThat(slice.job()).isSameAs(job);
assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
assertThat(slice.job().status).isEqualTo(job.status);
+ RestoreJobUtil util = mock(RestoreJobUtil.class);
+ when(util.currentTimeNanos()).thenAnswer(invok ->
currentNanoTimeSupplier.get());
return new TestRestoreSliceTask(slice, mockStorageClient,
executorPool, mockSSTableImporter,
- 0, sliceDatabaseAccessor, stats);
+ 0, sliceDatabaseAccessor, stats, util);
}
private RestoreSliceTask createTaskWithExceptions(RestoreSlice slice,
RestoreJob job)
@@ -319,7 +342,7 @@ class RestoreSliceTaskTest
assertThat(slice.job().isManagedBySidecar()).isEqualTo(job.isManagedBySidecar());
assertThat(slice.job().status).isEqualTo(job.status);
return new TestUnexpectedExceptionInRestoreSliceTask(slice,
mockStorageClient, executorPool,
-
mockSSTableImporter, 0, sliceDatabaseAccessor, stats);
+
mockSSTableImporter, 0, sliceDatabaseAccessor, stats, util);
}
static class TestRestoreSliceAccessor extends RestoreSliceDatabaseAccessor
@@ -346,10 +369,11 @@ class RestoreSliceTaskTest
public TestRestoreSliceTask(RestoreSlice slice, StorageClient
s3Client, TaskExecutorPool executorPool,
SSTableImporter importer, double
requiredUsableSpacePercentage,
- RestoreSliceDatabaseAccessor
sliceDatabaseAccessor, RestoreJobStats stats)
+ RestoreSliceDatabaseAccessor
sliceDatabaseAccessor, RestoreJobStats stats,
+ RestoreJobUtil restoreJobUtil)
{
- super(slice, s3Client, executorPool, importer,
requiredUsableSpacePercentage, sliceDatabaseAccessor, stats,
- null);
+ super(slice, s3Client, executorPool, importer,
requiredUsableSpacePercentage,
+ sliceDatabaseAccessor, stats, restoreJobUtil);
this.slice = slice;
this.stats = stats;
}
@@ -382,10 +406,10 @@ class RestoreSliceTaskTest
TaskExecutorPool
executorPool, SSTableImporter importer,
double
requiredUsableSpacePercentage,
RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
- RestoreJobStats stats)
+ RestoreJobStats
stats, RestoreJobUtil util)
{
super(slice, s3Client, executorPool, importer,
requiredUsableSpacePercentage, sliceDatabaseAccessor, stats,
- null);
+ util);
}
@Override
diff --git
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
index 30bf04b..6cf72c1 100644
---
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
@@ -240,6 +240,7 @@ class BaseUploadsHandlerTest
super(Vertx.vertx(), 1, null, null, null, null, null, "localhost",
9043);
}
+ @Override
protected JmxNotificationListener initializeJmxListener()
{
return null;
diff --git
a/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java
b/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java
index 713c33c..0a2cd9e 100644
--- a/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java
+++ b/src/test/java/org/apache/cassandra/sidecar/stats/TestRestoreJobStats.java
@@ -19,7 +19,9 @@
package org.apache.cassandra.sidecar.stats;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Test implementation for testing restore job related stats captured
@@ -39,6 +41,7 @@ public class TestRestoreJobStats implements RestoreJobStats
public long failedJobCount;
public long activeJobCount;
public long tokenRefreshCount;
+ public Map<Integer, Long> longRunningRestoreHandlers = new HashMap<>();
@Override
public void captureSliceCompletionTime(int instanceId, long durationNanos)
@@ -118,4 +121,10 @@ public class TestRestoreJobStats implements RestoreJobStats
{
tokenRefreshCount += 1;
}
+
+ @Override
+ public void captureLongRunningRestoreHandler(int instanceId, long
handlerDuration)
+ {
+ longRunningRestoreHandlers.put(instanceId, handlerDuration);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]