This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 22b6df4 CASSANDRASC-126: Make RestoreJobDiscoverer less verbose (#116) 22b6df4 is described below commit 22b6df435936bdf0a616cf811c9a84821d4eae3c Author: Yifan Cai <52585731+yifa...@users.noreply.github.com> AuthorDate: Thu Apr 25 12:40:44 2024 -0700 CASSANDRASC-126: Make RestoreJobDiscoverer less verbose (#116) Avoid logging the identical restore job info repeatedly Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRASC-126 --- CHANGES.txt | 1 + .../sidecar/restore/RestoreJobDiscoverer.java | 51 +++++++++++++++++++++- .../sidecar/restore/RestoreJobDiscovererTest.java | 38 ++++++++++++++++ 3 files changed, 89 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index d80f7b5..7968525 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Make RestoreJobDiscoverer less verbose (CASSANDRASC-126) * Import Queue pendingImports metrics is reporting an incorrect value (CASSANDRASC-125) * Add missing method to retrieve the InetSocketAddress to DriverUtils (CASSANDRASC-123) * Reduce filesystem calls while streaming SSTables (CASSANDRASC-94) diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java index 431db43..2f865ff 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java @@ -18,7 +18,12 @@ package org.apache.cassandra.sidecar.restore; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -31,6 +36,7 @@ import com.google.inject.Provider; import com.google.inject.Singleton; import io.vertx.core.Promise; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; import org.apache.cassandra.sidecar.config.RestoreJobConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.db.RestoreJob; @@ -63,6 +69,7 @@ public class RestoreJobDiscoverer implements PeriodicTask private final LocalTokenRangesProvider localTokenRangesProvider; private final InstanceMetadataFetcher instanceMetadataFetcher; private final RestoreMetrics metrics; + private final JobIdsByDay jobIdsByDay; private volatile boolean refreshSignaled = true; private int inflightJobsCount = 0; private int jobDiscoveryRecencyDays; @@ -107,6 +114,7 @@ public class RestoreJobDiscoverer implements PeriodicTask this.localTokenRangesProvider = cachedLocalTokenRanges; this.instanceMetadataFetcher = instanceMetadataFetcher; this.metrics = metrics.server().restore(); + this.jobIdsByDay = new JobIdsByDay(); } @Override @@ -158,7 +166,11 @@ public class RestoreJobDiscoverer implements PeriodicTask int expiredJobs = 0; for (RestoreJob job : restoreJobs) { - LOGGER.info("Found job. jobId={} job={}", job.jobId, job); + if (jobIdsByDay.shouldLogJob(job)) + { + LOGGER.info("Found job. jobId={} job={}", job.jobId, job); + } + try { switch (job.status) @@ -197,6 +209,7 @@ public class RestoreJobDiscoverer implements PeriodicTask LOGGER.warn("Exception on processing job. jobId: {}", job.jobId, exception); } } + jobIdsByDay.cleanupMaybe(); // shrink recency to lookup less data next time jobDiscoveryRecencyDays = Math.min(days, jobDiscoveryRecencyDays); // reset the external refresh signal if any @@ -291,6 +304,42 @@ public class RestoreJobDiscoverer implements PeriodicTask return Math.abs(date1.getDaysSinceEpoch() - date2.getDaysSinceEpoch()); } + static class JobIdsByDay + { + private final Map<Integer, Map<UUID, RestoreJobStatus>> jobsByDay = new HashMap<>(); + private final Set<Integer> discoveredDays = new HashSet<>(); // contains the days of the jobs seen from the current round of discovery + + /** + * Log the jobs when any of the condition is met: + * - newly discovered + * - in CREATED status + * - status changed + * + * @return true to log the job + */ + boolean shouldLogJob(RestoreJob job) + { + int day = job.createdAt.getDaysSinceEpoch(); + discoveredDays.add(day); + Map<UUID, RestoreJobStatus> jobs = jobsByDay.computeIfAbsent(day, key -> new HashMap<>()); + RestoreJobStatus oldStatus = jobs.put(job.jobId, job.status); + return oldStatus == null || job.status == RestoreJobStatus.CREATED || oldStatus != job.status; + } + + void cleanupMaybe() + { + // remove all the jobIds of the days that are not discovered + jobsByDay.keySet().removeIf(day -> !discoveredDays.contains(day)); + discoveredDays.clear(); + } + + @VisibleForTesting + Map<Integer, Map<UUID, RestoreJobStatus>> jobsByDay() + { + return jobsByDay; + } + } + @VisibleForTesting boolean hasInflightJobs() { diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java index 7a4b246..671cfc3 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.datastax.driver.core.LocalDate; import com.datastax.driver.core.utils.UUIDs; import com.fasterxml.jackson.core.JsonProcessingException; import io.vertx.core.Promise; @@ -255,6 +256,43 @@ class RestoreJobDiscovererTest assertThat(abortedJobs.getAllValues()).isEqualTo(expectedAbortedJobs); } + @Test + void testWhenJobShouldBeLogged() + { + RestoreJobDiscoverer.JobIdsByDay jobIdsByDay = new RestoreJobDiscoverer.JobIdsByDay(); + RestoreJob job = createNewTestingJob(UUIDs.timeBased()); + assertThat(jobIdsByDay.shouldLogJob(job)) + .describedAs("should return true for the new job") + .isTrue(); + assertThat(jobIdsByDay.shouldLogJob(job)) + .describedAs("should return true for the same job in CREATED status") + .isTrue(); + RestoreJob statusUpdated = job.unbuild().jobStatus(RestoreJobStatus.SUCCEEDED).build(); + assertThat(jobIdsByDay.shouldLogJob(statusUpdated)) + .describedAs("should return true for the status-updated job") + .isTrue(); + assertThat(jobIdsByDay.shouldLogJob(statusUpdated)) + .describedAs("should return false for the same SUCCEEDED job") + .isFalse(); + } + + @Test + void testCleanupJobIdsByDay() + { + RestoreJobDiscoverer.JobIdsByDay jobIdsByDay = new RestoreJobDiscoverer.JobIdsByDay(); + RestoreJob job = createNewTestingJob(UUIDs.timeBased()); + jobIdsByDay.shouldLogJob(job); // insert the job + jobIdsByDay.cleanupMaybe(); // issue a cleanup. but it should not remove anything + assertThat(jobIdsByDay.jobsByDay()).hasSize(1) + .containsKey(job.createdAt.getDaysSinceEpoch()); + RestoreJob jobOfNextDay = job.unbuild().createdAt(LocalDate.fromDaysSinceEpoch(job.createdAt.getDaysSinceEpoch() + 1)).build(); + jobIdsByDay.shouldLogJob(jobOfNextDay); + jobIdsByDay.cleanupMaybe(); // issue a new cleanup. it should remove the job that is not reported in the new round + assertThat(jobIdsByDay.jobsByDay()).hasSize(1) + .containsKey(jobOfNextDay.createdAt.getDaysSinceEpoch()) + .doesNotContainKey(job.createdAt.getDaysSinceEpoch()); + } + private RestoreJobConfiguration testConfig() { RestoreJobConfiguration restoreJobConfiguration = mock(RestoreJobConfiguration.class); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org