This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22b6df4 CASSANDRASC-126: Make RestoreJobDiscoverer less verbose (#116)
22b6df4 is described below
commit 22b6df435936bdf0a616cf811c9a84821d4eae3c
Author: Yifan Cai <[email protected]>
AuthorDate: Thu Apr 25 12:40:44 2024 -0700
CASSANDRASC-126: Make RestoreJobDiscoverer less verbose (#116)
Avoid logging the identical restore job info repeatedly
Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRASC-126
---
CHANGES.txt | 1 +
.../sidecar/restore/RestoreJobDiscoverer.java | 51 +++++++++++++++++++++-
.../sidecar/restore/RestoreJobDiscovererTest.java | 38 ++++++++++++++++
3 files changed, 89 insertions(+), 1 deletion(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d80f7b5..7968525 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Make RestoreJobDiscoverer less verbose (CASSANDRASC-126)
* Import Queue pendingImports metrics is reporting an incorrect value
(CASSANDRASC-125)
* Add missing method to retrieve the InetSocketAddress to DriverUtils
(CASSANDRASC-123)
* Reduce filesystem calls while streaming SSTables (CASSANDRASC-94)
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
index 431db43..2f865ff 100644
---
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
+++
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
@@ -18,7 +18,12 @@
package org.apache.cassandra.sidecar.restore;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -31,6 +36,7 @@ import com.google.inject.Provider;
import com.google.inject.Singleton;
import io.vertx.core.Promise;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.db.RestoreJob;
@@ -63,6 +69,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
private final LocalTokenRangesProvider localTokenRangesProvider;
private final InstanceMetadataFetcher instanceMetadataFetcher;
private final RestoreMetrics metrics;
+ private final JobIdsByDay jobIdsByDay;
private volatile boolean refreshSignaled = true;
private int inflightJobsCount = 0;
private int jobDiscoveryRecencyDays;
@@ -107,6 +114,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
this.localTokenRangesProvider = cachedLocalTokenRanges;
this.instanceMetadataFetcher = instanceMetadataFetcher;
this.metrics = metrics.server().restore();
+ this.jobIdsByDay = new JobIdsByDay();
}
@Override
@@ -158,7 +166,11 @@ public class RestoreJobDiscoverer implements PeriodicTask
int expiredJobs = 0;
for (RestoreJob job : restoreJobs)
{
- LOGGER.info("Found job. jobId={} job={}", job.jobId, job);
+ if (jobIdsByDay.shouldLogJob(job))
+ {
+ LOGGER.info("Found job. jobId={} job={}", job.jobId, job);
+ }
+
try
{
switch (job.status)
@@ -197,6 +209,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
LOGGER.warn("Exception on processing job. jobId: {}",
job.jobId, exception);
}
}
+ jobIdsByDay.cleanupMaybe();
// shrink recency to lookup less data next time
jobDiscoveryRecencyDays = Math.min(days, jobDiscoveryRecencyDays);
// reset the external refresh signal if any
@@ -291,6 +304,42 @@ public class RestoreJobDiscoverer implements PeriodicTask
return Math.abs(date1.getDaysSinceEpoch() - date2.getDaysSinceEpoch());
}
+ static class JobIdsByDay
+ {
+ private final Map<Integer, Map<UUID, RestoreJobStatus>> jobsByDay =
new HashMap<>();
+ private final Set<Integer> discoveredDays = new HashSet<>(); //
contains the days of the jobs seen from the current round of discovery
+
+ /**
+ * Log the jobs when any of the condition is met:
+ * - newly discovered
+ * - in CREATED status
+ * - status changed
+ *
+ * @return true to log the job
+ */
+ boolean shouldLogJob(RestoreJob job)
+ {
+ int day = job.createdAt.getDaysSinceEpoch();
+ discoveredDays.add(day);
+ Map<UUID, RestoreJobStatus> jobs = jobsByDay.computeIfAbsent(day,
key -> new HashMap<>());
+ RestoreJobStatus oldStatus = jobs.put(job.jobId, job.status);
+ return oldStatus == null || job.status == RestoreJobStatus.CREATED
|| oldStatus != job.status;
+ }
+
+ void cleanupMaybe()
+ {
+ // remove all the jobIds of the days that are not discovered
+ jobsByDay.keySet().removeIf(day -> !discoveredDays.contains(day));
+ discoveredDays.clear();
+ }
+
+ @VisibleForTesting
+ Map<Integer, Map<UUID, RestoreJobStatus>> jobsByDay()
+ {
+ return jobsByDay;
+ }
+ }
+
@VisibleForTesting
boolean hasInflightJobs()
{
diff --git
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
index 7a4b246..671cfc3 100644
---
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.vertx.core.Promise;
@@ -255,6 +256,43 @@ class RestoreJobDiscovererTest
assertThat(abortedJobs.getAllValues()).isEqualTo(expectedAbortedJobs);
}
+ @Test
+ void testWhenJobShouldBeLogged()
+ {
+ RestoreJobDiscoverer.JobIdsByDay jobIdsByDay = new
RestoreJobDiscoverer.JobIdsByDay();
+ RestoreJob job = createNewTestingJob(UUIDs.timeBased());
+ assertThat(jobIdsByDay.shouldLogJob(job))
+ .describedAs("should return true for the new job")
+ .isTrue();
+ assertThat(jobIdsByDay.shouldLogJob(job))
+ .describedAs("should return true for the same job in CREATED status")
+ .isTrue();
+ RestoreJob statusUpdated =
job.unbuild().jobStatus(RestoreJobStatus.SUCCEEDED).build();
+ assertThat(jobIdsByDay.shouldLogJob(statusUpdated))
+ .describedAs("should return true for the status-updated job")
+ .isTrue();
+ assertThat(jobIdsByDay.shouldLogJob(statusUpdated))
+ .describedAs("should return false for the same SUCCEEDED job")
+ .isFalse();
+ }
+
+ @Test
+ void testCleanupJobIdsByDay()
+ {
+ RestoreJobDiscoverer.JobIdsByDay jobIdsByDay = new
RestoreJobDiscoverer.JobIdsByDay();
+ RestoreJob job = createNewTestingJob(UUIDs.timeBased());
+ jobIdsByDay.shouldLogJob(job); // insert the job
+ jobIdsByDay.cleanupMaybe(); // issue a cleanup. but it should not
remove anything
+ assertThat(jobIdsByDay.jobsByDay()).hasSize(1)
+
.containsKey(job.createdAt.getDaysSinceEpoch());
+ RestoreJob jobOfNextDay =
job.unbuild().createdAt(LocalDate.fromDaysSinceEpoch(job.createdAt.getDaysSinceEpoch()
+ 1)).build();
+ jobIdsByDay.shouldLogJob(jobOfNextDay);
+ jobIdsByDay.cleanupMaybe(); // issue a new cleanup. it should remove
the job that is not reported in the new round
+ assertThat(jobIdsByDay.jobsByDay()).hasSize(1)
+
.containsKey(jobOfNextDay.createdAt.getDaysSinceEpoch())
+
.doesNotContainKey(job.createdAt.getDaysSinceEpoch());
+ }
+
private RestoreJobConfiguration testConfig()
{
RestoreJobConfiguration restoreJobConfiguration =
mock(RestoreJobConfiguration.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]