This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new ee45474 CASSANDRASC-99 Break restore job into stage and import phases
and persist restore slice status on phase completion
ee45474 is described below
commit ee454741363f3f693726af242c5ec37ad1480d60
Author: Yifan Cai <[email protected]>
AuthorDate: Mon Jan 29 16:09:25 2024 -0800
CASSANDRASC-99 Break restore job into stage and import phases and persist
restore slice status on phase completion
patch by Yifan Cai; reviewed by Doug Rohrer, Francisco Guerrero for
CASSANDRASC-99
---
.../data/CreateRestoreJobRequestPayload.java | 28 ++-
.../sidecar/common/data/RestoreJobConstants.java | 1 +
.../sidecar/common/data/RestoreJobStatus.java | 1 +
.../sidecar/common/data/RestoreSliceStatus.java | 37 ++-
.../data/CreateRestoreJobRequestPayloadTest.java | 6 +-
.../common/data/RestoreSliceStatusTest.java | 83 +++++++
spotbugs-exclude.xml | 1 +
.../config/yaml/RestoreJobConfigurationImpl.java | 16 +-
.../apache/cassandra/sidecar/db/RestoreJob.java | 85 ++++---
.../sidecar/db/RestoreJobDatabaseAccessor.java | 21 +-
.../apache/cassandra/sidecar/db/RestoreSlice.java | 97 ++++++--
.../sidecar/db/RestoreSliceDatabaseAccessor.java | 47 ++--
.../sidecar/db/schema/RestoreJobsSchema.java | 5 +-
.../sidecar/db/schema/RestoreSlicesSchema.java | 2 +-
.../sidecar/locator/CachedLocalTokenRanges.java | 276 +++++++++++++++++++++
.../sidecar/locator/LocalTokenRangesProvider.java | 41 +++
.../sidecar/restore/RestoreJobDiscoverer.java | 55 +++-
.../cassandra/sidecar/restore/RestoreJobUtil.java | 2 +-
.../sidecar/restore/RestoreProcessor.java | 36 ++-
.../sidecar/restore/RestoreSliceTask.java | 118 +++++++--
.../cassandra/sidecar/restore/StorageClient.java | 2 +-
.../routes/restore/AbortRestoreJobHandler.java | 6 +-
.../routes/restore/CreateRestoreJobHandler.java | 2 +-
.../routes/restore/CreateRestoreSliceHandler.java | 2 +-
.../routes/restore/UpdateRestoreJobHandler.java | 17 +-
.../db/RestoreJobsDatabaseAccessorIntTest.java | 12 +-
.../testing/ConfigurableCassandraTestContext.java | 43 +++-
.../cassandra/sidecar/db/RestoreJobTest.java | 16 ++
.../cassandra/sidecar/db/SidecarSchemaTest.java | 53 +++-
.../sidecar/restore/RestoreJobDiscovererTest.java | 84 ++++---
.../sidecar/restore/RestoreJobManagerTest.java | 7 +-
.../sidecar/restore/RestoreProcessorTest.java | 3 +-
.../sidecar/restore/RestoreSliceTaskTest.java | 113 +++++++--
.../sidecar/restore/RestoreSliceTest.java | 2 +-
.../routes/restore/BaseRestoreJobTests.java | 1 -
.../restore/RestoreJobSummaryHandlerTest.java | 29 ++-
.../restore/UpdateRestoreJobHandlerTest.java | 10 +-
.../sidecar/utils/AsyncFileSystemUtilsTest.java | 111 +++++++++
38 files changed, 1255 insertions(+), 216 deletions(-)
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java
index 12858d8..0e5a9a0 100644
---
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayload.java
@@ -26,8 +26,10 @@ import java.util.function.Consumer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.jetbrains.annotations.Nullable;
import static
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_AGENT;
+import static
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_CONSISTENCY_LEVEL;
import static
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_EXPIRE_AT;
import static
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_ID;
import static
org.apache.cassandra.sidecar.common.data.RestoreJobConstants.JOB_IMPORT_OPTIONS;
@@ -43,6 +45,8 @@ public class CreateRestoreJobRequestPayload
private final RestoreJobSecrets secrets;
private final SSTableImportOptions importOptions;
private final long expireAtInMillis;
+ @Nullable
+ private final String consistencyLevel; // optional field
/**
* Builder to build a CreateRestoreJobRequest
@@ -65,13 +69,15 @@ public class CreateRestoreJobRequestPayload
* @param secrets secrets to be used by restore job to download
data
* @param importOptions the configured options for SSTable import
* @param expireAtInMillis a timestamp in the future when the job is
considered expired
+ * @param consistencyLevel consistency level a job should satisfy
*/
@JsonCreator
public CreateRestoreJobRequestPayload(@JsonProperty(JOB_ID) UUID jobId,
@JsonProperty(JOB_AGENT) String
jobAgent,
@JsonProperty(JOB_SECRETS)
RestoreJobSecrets secrets,
@JsonProperty(JOB_IMPORT_OPTIONS)
SSTableImportOptions importOptions,
- @JsonProperty(JOB_EXPIRE_AT) long
expireAtInMillis)
+ @JsonProperty(JOB_EXPIRE_AT) long
expireAtInMillis,
+ @JsonProperty(JOB_CONSISTENCY_LEVEL)
String consistencyLevel)
{
Preconditions.checkArgument(jobId == null || jobId.version() == 1,
"Only time based UUIDs allowed for jobId");
@@ -85,6 +91,7 @@ public class CreateRestoreJobRequestPayload
? SSTableImportOptions.defaults()
: importOptions;
this.expireAtInMillis = expireAtInMillis;
+ this.consistencyLevel = consistencyLevel;
}
private CreateRestoreJobRequestPayload(Builder builder)
@@ -94,6 +101,7 @@ public class CreateRestoreJobRequestPayload
this.secrets = builder.secrets;
this.importOptions = builder.importOptions;
this.expireAtInMillis = builder.expireAtInMillis;
+ this.consistencyLevel = builder.consistencyLevel;
}
/**
@@ -151,6 +159,16 @@ public class CreateRestoreJobRequestPayload
return new Date(expireAtInMillis);
}
+ /**
+ * @return the consistency level a job should satisfy
+ */
+ @JsonProperty(JOB_CONSISTENCY_LEVEL)
+ @Nullable
+ public String consistencyLevel()
+ {
+ return consistencyLevel;
+ }
+
@Override
public String toString()
{
@@ -159,6 +177,7 @@ public class CreateRestoreJobRequestPayload
JOB_AGENT + "='" + jobAgent + "', " +
JOB_SECRETS + "='" + secrets + "', " +
JOB_EXPIRE_AT + "='" + expireAtInMillis + "', " +
+ JOB_CONSISTENCY_LEVEL + "='" + consistencyLevel + "', " +
JOB_IMPORT_OPTIONS + "='" + importOptions + "'}";
}
@@ -173,6 +192,7 @@ public class CreateRestoreJobRequestPayload
private UUID jobId = null;
private String jobAgent = null;
+ private String consistencyLevel = null;
Builder(RestoreJobSecrets secrets, long expireAtInMillis)
{
@@ -198,6 +218,12 @@ public class CreateRestoreJobRequestPayload
return this;
}
+ public Builder consistencyLevel(String consistencyLevel)
+ {
+ this.consistencyLevel = consistencyLevel;
+ return this;
+ }
+
public CreateRestoreJobRequestPayload build()
{
return new CreateRestoreJobRequestPayload(this);
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
index 78707ae..d18b0b3 100644
---
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobConstants.java
@@ -32,6 +32,7 @@ public class RestoreJobConstants
public static final String JOB_CREATED_AT = "createdAt";
public static final String JOB_KEYSPACE = "keyspace";
public static final String JOB_TABLE = "table";
+ public static final String JOB_CONSISTENCY_LEVEL = "consistencyLevel";
public static final String SLICE_ID = "sliceId";
public static final String BUCKET_ID = "bucketId";
public static final String SLICE_START_TOKEN = "startToken";
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java
index 1d691da..4694510 100644
---
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java
@@ -24,6 +24,7 @@ package org.apache.cassandra.sidecar.common.data;
public enum RestoreJobStatus
{
CREATED,
+ STAGED,
@Deprecated // replaced by ABORTED
FAILED,
ABORTED,
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java
index fa874bb..379be71 100644
---
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatus.java
@@ -18,15 +18,44 @@
package org.apache.cassandra.sidecar.common.data;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+
/**
* Holds all possible restore slice statues
*/
public enum RestoreSliceStatus
{
- EMPTY,
- PROCESSING,
- COMMITTING,
SUCCEEDED,
FAILED,
- ABORTED
+ ABORTED,
+ COMMITTING(SUCCEEDED, FAILED, ABORTED),
+ STAGED(COMMITTING, FAILED, ABORTED),
+ PROCESSING(STAGED, FAILED, ABORTED),
+ EMPTY(PROCESSING, FAILED, ABORTED);
+
+ // Do not use EnumSet, since validTargetStatuses is assigned on
constructing and enums are not available yet.
+ private final Set<RestoreSliceStatus> validTargetStatusSet;
+
+ RestoreSliceStatus(RestoreSliceStatus... targetStatuses)
+ {
+ this.validTargetStatusSet = new HashSet<>();
+ Collections.addAll(validTargetStatusSet, targetStatuses);
+ }
+
+ /**
+ * Advance the status with validation
+ * @param targetStatus target status to advance to
+ * @return new status
+ */
+ public RestoreSliceStatus advanceTo(RestoreSliceStatus targetStatus)
+ {
+
Preconditions.checkArgument(validTargetStatusSet.contains(targetStatus),
+ name() + " status can only advance to one
of the follow statuses: " +
+ validTargetStatusSet);
+ return targetStatus;
+ }
}
diff --git
a/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java
b/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java
index e7f47b9..08ca103 100644
---
a/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java
+++
b/common/src/test/java/org/apache/cassandra/sidecar/common/data/CreateRestoreJobRequestPayloadTest.java
@@ -46,6 +46,7 @@ class CreateRestoreJobRequestPayloadTest
Date date = Date.from(Instant.ofEpochMilli(time));
CreateRestoreJobRequestPayload req =
CreateRestoreJobRequestPayload.builder(secrets, time)
.jobId(UUID.fromString(id))
+
.consistencyLevel("QUORUM")
.jobAgent("agent")
.build();
String json = MAPPER.writeValueAsString(req);
@@ -56,6 +57,7 @@ class CreateRestoreJobRequestPayloadTest
assertThat(test.expireAtInMillis()).isEqualTo(time);
assertThat(test.expireAtAsDate()).isEqualTo(date);
assertThat(test.importOptions()).isEqualTo(SSTableImportOptions.defaults());
+ assertThat(test.consistencyLevel()).isEqualTo("QUORUM");
}
@Test
@@ -157,9 +159,9 @@ class CreateRestoreJobRequestPayloadTest
assertThat(test.expireAtInMillis()).isEqualTo(time);
assertThat(test.expireAtAsDate()).isEqualTo(date);
assertThat(test.importOptions()).isEqualTo(SSTableImportOptions.defaults());
+ assertThat(test.consistencyLevel()).isNull();
}
-
@Test
void testBuilder()
{
@@ -172,11 +174,13 @@ class CreateRestoreJobRequestPayloadTest
.resetLevel(false)
.clearRepaired(false);
})
+ .consistencyLevel("QUORUM")
.build();
assertThat(req.secrets()).isEqualTo(secrets);
assertThat(req.jobAgent()).isEqualTo("agent");
assertThat(req.importOptions()).isEqualTo(SSTableImportOptions.defaults()
.resetLevel(false)
.clearRepaired(false));
+ assertThat(req.consistencyLevel()).isEqualTo("QUORUM");
}
}
diff --git
a/common/src/test/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatusTest.java
b/common/src/test/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatusTest.java
new file mode 100644
index 0000000..27607cf
--- /dev/null
+++
b/common/src/test/java/org/apache/cassandra/sidecar/common/data/RestoreSliceStatusTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.ABORTED;
+import static
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.COMMITTING;
+import static
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.EMPTY;
+import static
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.FAILED;
+import static
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.PROCESSING;
+import static
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.STAGED;
+import static
org.apache.cassandra.sidecar.common.data.RestoreSliceStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class RestoreSliceStatusTest
+{
+ @Test
+ void testStatusAdvancing()
+ {
+ assertAdvanceTo(EMPTY, PROCESSING);
+ assertAdvanceTo(EMPTY, FAILED);
+ assertAdvanceTo(EMPTY, ABORTED);
+ assertAdvanceTo(PROCESSING, STAGED);
+ assertAdvanceTo(PROCESSING, FAILED);
+ assertAdvanceTo(PROCESSING, ABORTED);
+ assertAdvanceTo(STAGED, COMMITTING);
+ assertAdvanceTo(STAGED, FAILED);
+ assertAdvanceTo(STAGED, ABORTED);
+ assertAdvanceTo(COMMITTING, SUCCEEDED);
+ assertAdvanceTo(COMMITTING, FAILED);
+ assertAdvanceTo(COMMITTING, ABORTED);
+ }
+
+ @Test
+ void testInvalidStatusAdvancing()
+ {
+ String commonErrorMsg = "status can only advance to one of the follow
statuses";
+
+ Stream
+ .of(new RestoreSliceStatus[][]
+ { // define test cases of invalid status advancing, e.g. it is
invalid to advance from EMPTY to STAGED
+ { EMPTY, STAGED },
+ { STAGED, EMPTY },
+ { EMPTY, COMMITTING },
+ { STAGED, SUCCEEDED },
+ { COMMITTING, STAGED },
+ { STAGED, STAGED },
+ { SUCCEEDED, FAILED },
+ { FAILED, SUCCEEDED }
+ })
+ .forEach(testCase -> {
+ assertThatThrownBy(() -> testCase[0].advanceTo(testCase[1]))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasNoCause()
+ .hasMessageContaining(commonErrorMsg);
+ });
+ }
+
+ private void assertAdvanceTo(RestoreSliceStatus from, RestoreSliceStatus
to)
+ {
+ assertThat(from.advanceTo(to)).isEqualTo(to);
+ }
+}
diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml
index 2439930..03be65c 100644
--- a/spotbugs-exclude.xml
+++ b/spotbugs-exclude.xml
@@ -44,6 +44,7 @@
<Class name="org.apache.cassandra.sidecar.CassandraSidecarDaemon"
/>
<Class name="org.apache.cassandra.sidecar.utils.SSTableImporter" />
<Class
name="org.apache.cassandra.sidecar.tasks.HealthCheckPeriodicTask" />
+ <Class
name="org.apache.cassandra.sidecar.restore.RestoreSliceTaskTest" />
</Or>
</Match>
diff --git
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
index 6048d26..9ce5efb 100644
---
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
+++
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/RestoreJobConfigurationImpl.java
@@ -167,8 +167,7 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
*/
public Builder jobDiscoveryActiveLoopDelayMillis(long
jobDiscoveryActiveLoopDelayMillis)
{
- this.jobDiscoveryActiveLoopDelayMillis =
jobDiscoveryActiveLoopDelayMillis;
- return this;
+ return update(b -> b.jobDiscoveryActiveLoopDelayMillis =
jobDiscoveryActiveLoopDelayMillis);
}
/**
@@ -180,8 +179,7 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
*/
public Builder jobDiscoveryIdleLoopDelayMillis(long
jobDiscoveryIdleLoopDelayMillis)
{
- this.jobDiscoveryIdleLoopDelayMillis =
jobDiscoveryIdleLoopDelayMillis;
- return this;
+ return update(b -> b.jobDiscoveryIdleLoopDelayMillis =
jobDiscoveryIdleLoopDelayMillis);
}
/**
@@ -193,8 +191,7 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
*/
public Builder jobDiscoveryRecencyDays(int jobDiscoveryRecencyDays)
{
- this.jobDiscoveryRecencyDays = jobDiscoveryRecencyDays;
- return this;
+ return update(b -> b.jobDiscoveryRecencyDays =
jobDiscoveryRecencyDays);
}
/**
@@ -206,8 +203,7 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
*/
public Builder processMaxConcurrency(int processMaxConcurrency)
{
- this.processMaxConcurrency = processMaxConcurrency;
- return this;
+ return update(b -> b.processMaxConcurrency =
processMaxConcurrency);
}
/**
@@ -219,11 +215,9 @@ public class RestoreJobConfigurationImpl implements
RestoreJobConfiguration
*/
public Builder restoreJobTablesTtlSeconds(long
restoreJobTablesTtlSeconds)
{
- this.restoreJobTablesTtlSeconds = restoreJobTablesTtlSeconds;
- return this;
+ return update(b -> b.restoreJobTablesTtlSeconds =
restoreJobTablesTtlSeconds);
}
-
@Override
public RestoreJobConfigurationImpl build()
{
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
index a7cee35..475f957 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJob.java
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
import java.util.Date;
import java.util.UUID;
-import com.google.common.annotations.VisibleForTesting;
-
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.utils.Bytes;
@@ -54,6 +52,7 @@ public class RestoreJob
public final Date expireAt;
public final short bucketCount;
public final String consistencyLevel;
+ public final Manager restoreJobManager;
public static Builder builder()
{
@@ -75,46 +74,12 @@ public class RestoreJob
.jobStatus(decodeJobStatus(row.getString("status")))
.jobSecrets(decodeJobSecrets(row.getBytes("blob_secrets")))
.expireAt(row.getTimestamp("expire_at"))
-
.sstableImportOptions(decodeSSTableImportOptions(row.getBytes("import_options")));
+
.sstableImportOptions(decodeSSTableImportOptions(row.getBytes("import_options")))
+ .consistencyLevel(row.getString("consistency_level"));
+
// todo: Yifan, add them back when the cql statement is updated to
reflect the new columns.
// Add new fields to CreateRestoreJobRequestPayload too
// .bucketCount(row.getShort("bucket_count"))
-// .consistencyLevel(row.getString("consistency_level"));
- return builder.build();
- }
-
- // todo: candidate to be removed
- public static RestoreJob forUpdates(UUID jobId, String jobAgent,
- RestoreJobStatus status,
- RestoreJobSecrets secrets,
- Date expireAt)
- throws DataObjectMappingException
- {
- Builder builder = new Builder();
- builder.createdAt(toLocalDate(jobId))
- .jobId(jobId).jobAgent(jobAgent)
- .jobStatus(status)
- .jobSecrets(secrets)
- .expireAt(expireAt);
- return builder.build();
- }
-
- // todo: candidate to be removed
- @VisibleForTesting
- public static RestoreJob create(LocalDate createdAt,
- UUID jobId,
- String keyspaceName,
- String tableName,
- String jobAgent,
- RestoreJobStatus status,
- RestoreJobSecrets secrets,
- SSTableImportOptions importOptions)
- {
- Builder builder = new Builder();
- builder.createdAt(createdAt)
- .jobId(jobId).jobAgent(jobAgent)
- .keyspace(keyspaceName).table(tableName)
-
.jobStatus(status).jobSecrets(secrets).sstableImportOptions(importOptions);
return builder.build();
}
@@ -156,6 +121,7 @@ public class RestoreJob
this.expireAt = builder.expireAt;
this.bucketCount = builder.bucketCount;
this.consistencyLevel = builder.consistencyLevel;
+ this.restoreJobManager = builder.manager;
}
public Builder unbuild()
@@ -163,6 +129,11 @@ public class RestoreJob
return new Builder(this);
}
+ public boolean isManagedBySidecar()
+ {
+ return restoreJobManager == Manager.SIDECAR;
+ }
+
/**
* {@inheritDoc}
*/
@@ -211,6 +182,7 @@ public class RestoreJob
private Date expireAt;
private short bucketCount;
private String consistencyLevel;
+ private Manager manager;
private Builder()
{
@@ -284,7 +256,10 @@ public class RestoreJob
public Builder consistencyLevel(String consistencyLevel)
{
- return update(b -> b.consistencyLevel = consistencyLevel);
+ return update(b -> {
+ b.consistencyLevel = consistencyLevel;
+ b.manager = resolveManager(consistencyLevel);
+ });
}
@Override
@@ -298,5 +273,35 @@ public class RestoreJob
{
return new RestoreJob(this);
}
+
+ /**
+ * Resolve the manager of the restore job based on the existence of
consistencyLevel
+ * @return the resolved Manager
+ */
+ private Manager resolveManager(String consistencyLevel)
+ {
+ // If spark is the manager, the restore job is created w/o
specifying consistency level
+ // If the manager of the restore job is sidecar, consistency level
must present
+ return consistencyLevel == null ? Manager.SPARK : Manager.SIDECAR;
+ }
+ }
+
+ /**
+ * The manager of the restore job. The variant could change the code path
a restore job runs.
+ * It is a feature switch essentially.
+ */
+ public enum Manager
+ {
+ /**
+ * The restore job is managed by Spark. Sidecar instances are just
simple workers. They rely on client/Spark
+ * for decision-making.
+ */
+ SPARK,
+
+ /**
+ * The restore job is managed by Sidecar. Sidecar instances should
assign slices to sidecar instances
+ * and check whether the job has met the consistency level to complete
the job.
+ */
+ SIDECAR,
}
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
index 3a06e08..99ba533 100644
---
a/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
+++
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreJobDatabaseAccessor.java
@@ -82,6 +82,7 @@ public class RestoreJobDatabaseAccessor extends
DatabaseAccessor
.jobSecrets(payload.secrets())
.sstableImportOptions(payload.importOptions())
.expireAt(payload.expireAtAsDate())
+
.consistencyLevel(payload.consistencyLevel())
.build();
ByteBuffer secrets = serializeValue(job.secrets, "secrets");
ByteBuffer importOptions = serializeValue(job.importOptions, "sstable
import options");
@@ -94,17 +95,29 @@ public class RestoreJobDatabaseAccessor extends
DatabaseAccessor
job.status.toString(),
secrets,
importOptions,
+ job.consistencyLevel,
job.expireAt);
execute(statement);
return job;
}
- public RestoreJob update(UpdateRestoreJobRequestPayload payload,
QualifiedTableName qualifiedTableName, UUID jobId)
+ /**
+ * Update fields in the restore job and persist
+ *
+ * @param payload fields to be updated
+ * @param jobId job ID
+ * @return the restore job object with only the updated fields
+ * @throws DataObjectMappingException when secrets json cannot be
serialized
+ */
+ public RestoreJob update(UpdateRestoreJobRequestPayload payload, UUID
jobId)
throws DataObjectMappingException
{
sidecarSchema.ensureInitialized();
+ RestoreJob.Builder updateBuilder = RestoreJob.builder();
LocalDate createdAt = RestoreJob.toLocalDate(jobId);
+ updateBuilder.createdAt(createdAt)
+ .jobId(jobId);
RestoreJobSecrets secrets = payload.secrets();
RestoreJobStatus status = payload.status();
@@ -127,22 +140,26 @@ public class RestoreJobDatabaseAccessor extends
DatabaseAccessor
{
throw new DataObjectMappingException("Failed to serialize
secrets", e);
}
+ updateBuilder.jobSecrets(secrets);
}
if (status != null)
{
batchStatement.add(restoreJobsSchema.updateStatus().bind(createdAt, jobId,
status.name()));
+ updateBuilder.jobStatus(status);
}
if (jobAgent != null)
{
batchStatement.add(restoreJobsSchema.updateJobAgent().bind(createdAt, jobId,
jobAgent));
+ updateBuilder.jobAgent(jobAgent);
}
if (expireAt != null)
{
batchStatement.add(restoreJobsSchema.updateExpireAt().bind(createdAt, jobId,
expireAt));
+ updateBuilder.expireAt(expireAt);
}
execute(batchStatement);
- return RestoreJob.forUpdates(jobId, jobAgent, status, secrets,
expireAt);
+ return updateBuilder.build();
}
public void abort(UUID jobId)
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
index fdf45c5..6353894 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSlice.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.sidecar.db;
import java.math.BigInteger;
import java.nio.file.Path;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -46,7 +48,14 @@ import org.apache.cassandra.sidecar.utils.SSTableImporter;
import org.jetbrains.annotations.NotNull;
/**
- * Data object that contains all values that matter to the restore job slice
+ * <p>Data object that contains all values that matter to the restore job
slice.</p>
+ *
+ * <p>How the staged files are organized on disk? For each slice,</p>
+ * <ol>
+ * <li>the S3 object is downloaded to the path at "stageDirectory/key". It is
a zip file.</li>
+ * <li>the zip is then extracted to the directory at
"stageDirectory/keyspace/table/".
+ * The extracted sstables are imported into Cassandra.</li>
+ * </ol>
*/
public class RestoreSlice
{
@@ -58,7 +67,12 @@ public class RestoreSlice
private final String bucket;
private final String key;
private final String checksum; // etag
- private final Path targetPathInStaging; // the path to store the s3 object
of the slice
+ // The path to the directory that stores the s3 object of the slice and
the sstables after unzipping.
+ // Its value is "baseStageDirectory/uploadId"
+ private final Path stageDirectory;
+ // The path to the staged s3 object (file). The path is inside
stageDirectory.
+ // Its value is "stageDirectory/key"
+ private final Path stagedObjectPath;
private final String uploadId;
private final InstanceMetadata owner;
private final BigInteger startToken;
@@ -69,7 +83,11 @@ public class RestoreSlice
private final long compressedSize;
private final long uncompressedSize;
private RestoreSliceTracker tracker;
+
+ // mutable states
private boolean existsOnS3 = false;
+ private boolean hasStaged = false;
+ private boolean hasImported = false;
private int downloadAttempt = 0;
private volatile boolean isCancelled = false;
@@ -88,7 +106,8 @@ public class RestoreSlice
this.bucket = builder.bucket;
this.key = builder.key;
this.checksum = builder.checksum;
- this.targetPathInStaging = builder.targetPathInStaging;
+ this.stageDirectory = builder.stageDirectory;
+ this.stagedObjectPath = builder.stagedObjectPath;
this.uploadId = builder.uploadId;
this.owner = builder.owner;
this.startToken = builder.startToken;
@@ -151,13 +170,29 @@ public class RestoreSlice
}
/**
- * Make the slice as completed
+ * Mark the slice as completed
*/
public void complete()
{
tracker.completeSlice(this);
}
+ /**
+ * Mark the slice has completed the stage phase
+ */
+ public void completeStagePhase()
+ {
+ this.hasStaged = true;
+ }
+
+ /**
+ * Mark the slice has completed the import phase
+ */
+ public void completeImportPhase()
+ {
+ this.hasImported = true;
+ }
+
public void failAtInstance(int instanceId)
{
statusByReplica.put(String.valueOf(instanceId),
RestoreSliceStatus.FAILED);
@@ -169,6 +204,7 @@ public class RestoreSlice
public void fail(RestoreJobFatalException exception)
{
tracker.fail(exception);
+ failAtInstance(owner().id());
}
public void setExistsOnS3()
@@ -196,6 +232,7 @@ public class RestoreSlice
ExecutorPools.TaskExecutorPool executorPool,
SSTableImporter importer,
double
requiredUsableSpacePercentage,
+
RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
RestoreJobStats stats)
{
if (isCancelled)
@@ -204,10 +241,12 @@ public class RestoreSlice
try
{
- RestoreJob restoreJob = job();
- StorageClient s3Client = s3ClientPool.storageClient(restoreJob);
- return new RestoreSliceTask(restoreJob, this, s3Client,
- executorPool, importer,
requiredUsableSpacePercentage, stats);
+ StorageClient s3Client = s3ClientPool.storageClient(job());
+ return new RestoreSliceTask(this, s3Client,
+ executorPool, importer,
+ requiredUsableSpacePercentage,
+ sliceDatabaseAccessor,
+ stats);
}
catch (IllegalStateException illegalState)
{
@@ -300,9 +339,21 @@ public class RestoreSlice
return this.replicas;
}
- public Path targetPathInStaging()
+ /**
+ * @return the path to the directory that stores the s3 object of the slice
+ * and the sstables after unzipping
+ */
+ public Path stageDirectory()
+ {
+ return stageDirectory;
+ }
+
+ /**
+ * @return the path to the staged s3 object
+ */
+ public Path stagedObjectPath()
{
- return targetPathInStaging;
+ return stagedObjectPath;
}
public long compressedSize()
@@ -330,6 +381,16 @@ public class RestoreSlice
return existsOnS3;
}
+ public boolean hasStaged()
+ {
+ return hasStaged;
+ }
+
+ public boolean hasImported()
+ {
+ return hasImported;
+ }
+
public int downloadAttempt()
{
return downloadAttempt;
@@ -350,6 +411,7 @@ public class RestoreSlice
public static RestoreSlice from(Row row)
{
Builder builder = new Builder();
+ builder.jobId(row.getUUID("job_id"));
builder.sliceId(row.getString("slice_id"));
builder.bucketId(row.getShort("bucket_id"));
builder.storageBucket(row.getString("bucket"));
@@ -377,7 +439,8 @@ public class RestoreSlice
private String bucket;
private String key;
private String checksum; // etag
- private Path targetPathInStaging; // the path to store the s3 object
of the slice
+ private Path stageDirectory;
+ private Path stagedObjectPath;
private String uploadId;
private InstanceMetadata owner;
private BigInteger startToken;
@@ -401,7 +464,7 @@ public class RestoreSlice
this.bucket = slice.bucket;
this.key = slice.key;
this.checksum = slice.checksum;
- this.targetPathInStaging = slice.targetPathInStaging;
+ this.stageDirectory = slice.stageDirectory;
this.uploadId = slice.uploadId;
this.owner = slice.owner;
this.startToken = slice.startToken;
@@ -450,10 +513,10 @@ public class RestoreSlice
return update(b -> b.checksum = checksum);
}
- public Builder targetPathInStaging(Path basePath, String uploadId)
+ public Builder stageDirectory(Path basePath, String uploadId)
{
return update(b -> {
- b.targetPathInStaging = basePath.resolve(uploadId);
+ b.stageDirectory = basePath.resolve(uploadId);
b.uploadId = uploadId;
});
}
@@ -485,12 +548,12 @@ public class RestoreSlice
public Builder replicaStatus(Map<String, RestoreSliceStatus>
statusByReplica)
{
- return update(b -> b.statusByReplica =
Collections.unmodifiableMap(statusByReplica));
+ return update(b -> b.statusByReplica = new
HashMap<>(statusByReplica));
}
public Builder replicas(Set<String> replicas)
{
- return update(b -> b.replicas =
Collections.unmodifiableSet(replicas));
+ return update(b -> b.replicas = new HashSet<>(replicas));
}
/**
@@ -525,6 +588,8 @@ public class RestoreSlice
@Override
public RestoreSlice build()
{
+ // precompute the path to the to-be-staged object on disk
+ stagedObjectPath = stageDirectory.resolve(key);
return new RestoreSlice(this);
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
index c3624a9..6416ce0 100644
---
a/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
+++
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreSliceDatabaseAccessor.java
@@ -51,18 +51,19 @@ public class RestoreSliceDatabaseAccessor extends
DatabaseAccessor
public RestoreSlice create(RestoreSlice slice)
{
- BoundStatement statement =
restoreSlicesSchema.insertSlice().bind(slice.jobId(),
-
slice.bucketId(),
-
slice.sliceId(),
-
slice.bucket(),
-
slice.key(),
-
slice.checksum(),
-
slice.startToken(),
-
slice.endToken(),
-
slice.compressedSize(),
-
slice.uncompressedSize(),
-
slice.statusByReplica(),
-
slice.replicas());
+ BoundStatement statement = restoreSlicesSchema.insertSlice()
+ .bind(slice.jobId(),
+ slice.bucketId(),
+ slice.sliceId(),
+ slice.bucket(),
+ slice.key(),
+ slice.checksum(),
+ slice.startToken(),
+ slice.endToken(),
+
slice.compressedSize(),
+
slice.uncompressedSize(),
+
slice.statusByReplica(),
+ slice.replicas());
execute(statement);
return slice;
}
@@ -71,12 +72,13 @@ public class RestoreSliceDatabaseAccessor extends
DatabaseAccessor
{
sidecarSchema.ensureInitialized();
- BoundStatement statement =
restoreSlicesSchema.updateStatus().bind(slice.statusByReplica(),
-
slice.replicas(),
-
slice.jobId(),
-
slice.bucketId(),
-
slice.startToken(),
-
slice.sliceId());
+ BoundStatement statement = restoreSlicesSchema.updateStatus()
+
.bind(slice.statusByReplica(),
+ slice.replicas(),
+ slice.jobId(),
+ slice.bucketId(),
+ slice.startToken(),
+ slice.sliceId());
Row row = execute(statement).one();
if (row == null)
{
@@ -91,10 +93,11 @@ public class RestoreSliceDatabaseAccessor extends
DatabaseAccessor
{
sidecarSchema.ensureInitialized();
- BoundStatement statement =
restoreSlicesSchema.findAllByTokenRange().bind(jobId,
-
bucketId,
-
startToken,
-
endToken);
+ BoundStatement statement = restoreSlicesSchema.findAllByTokenRange()
+ .bind(jobId,
+ bucketId,
+ startToken,
+ endToken);
ResultSet result = execute(statement);
List<RestoreSlice> slices = new ArrayList<>();
for (Row row : result)
diff --git
a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
index 4476a61..8df27bf 100644
---
a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
+++
b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreJobsSchema.java
@@ -141,8 +141,9 @@ public class RestoreJobsSchema extends
AbstractSchema.TableSchema
" status," +
" blob_secrets," +
" import_options," +
+ " consistency_level," +
" expire_at" +
- ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", config);
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
config);
}
static String updateBlobSecrets(SchemaKeyspaceConfiguration config)
@@ -192,6 +193,7 @@ public class RestoreJobsSchema extends
AbstractSchema.TableSchema
"status, " +
"blob_secrets, " +
"import_options, " +
+ "consistency_level, " +
"expire_at " +
"FROM %s.%s " +
"WHERE created_at = ? AND job_id = ?", config);
@@ -207,6 +209,7 @@ public class RestoreJobsSchema extends
AbstractSchema.TableSchema
"status, " +
"blob_secrets, " +
"import_options, " +
+ "consistency_level, " +
"expire_at " +
"FROM %s.%s " +
"WHERE created_at = ?", config);
diff --git
a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
index c016d2b..0602a9e 100644
---
a/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
+++
b/src/main/java/org/apache/cassandra/sidecar/db/schema/RestoreSlicesSchema.java
@@ -78,7 +78,7 @@ public class RestoreSlicesSchema extends
AbstractSchema.TableSchema
" end_token varint," +
" compressed_size bigint," +
" uncompressed_size bigint," +
- " status_by_replica map<text, text>," +
+ " status_by_replica map<text, text>," + // key
is instance ID; value is RestoreSliceStatus
" all_replicas set<text>," +
" PRIMARY KEY ((job_id, bucket_id), start_token,
slice_id)" +
") WITH default_time_to_live = %s",
diff --git
a/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
new file mode 100644
index 0000000..a171594
--- /dev/null
+++
b/src/main/java/org/apache/cassandra/sidecar/locator/CachedLocalTokenRanges.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.locator;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Get token ranges owned and replicated to the local Cassandra instance(s) by
keyspace
+ * The results are cached and gets invalidated when local instances or cluster
topology changed
+ */
+@Singleton
+public class CachedLocalTokenRanges implements LocalTokenRangesProvider
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CachedLocalTokenRanges.class);
+ private final InstancesConfig instancesConfig;
+ private final DnsResolver dnsResolver;
+
+ @GuardedBy("this")
+ private Set<Integer> localInstanceIdsCache;
+ @GuardedBy("this")
+ private Set<Host> allInstancesCache;
+ @GuardedBy("this")
+ private Set<Host> localInstancesCache;
+ @GuardedBy("this")
+ private ImmutableMap<String, Map<Integer, Set<TokenRange>>>
localTokenRangesCache;
+
+ @Inject
+ public CachedLocalTokenRanges(InstancesConfig instancesConfig, DnsResolver
dnsResolver)
+ {
+ this.instancesConfig = instancesConfig;
+ this.dnsResolver = dnsResolver;
+ this.localTokenRangesCache = null;
+ this.localInstanceIdsCache = null;
+ this.allInstancesCache = null;
+ this.localInstancesCache = null;
+ }
+
+ @Override
+ @Nullable
+ public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace)
+ {
+ List<InstanceMetadata> localInstances = instancesConfig.instances();
+
+ if (localInstances.isEmpty())
+ {
+ LOGGER.warn("No local instances found");
+ return Collections.emptyMap();
+ }
+
+ CassandraAdapterDelegate delegate = localInstances.get(0).delegate();
+ Metadata metadata = delegate == null ? null : delegate.metadata();
+ if (metadata == null)
+ {
+ LOGGER.debug("Not yet connect to Cassandra cluster");
+ return Collections.emptyMap();
+ }
+
+ if (metadata.getKeyspace(keyspace) == null)
+ {
+ throw new NoSuchElementException("Keyspace does not exist.
keyspace: " + keyspace);
+ }
+
+ Set<Integer> localInstanceIds = localInstances.stream()
+
.map(InstanceMetadata::id)
+
.collect(Collectors.toSet());
+ Set<Host> allInstances = metadata.getAllHosts();
+ return getCacheOrReload(metadata, keyspace, localInstanceIds,
localInstances, allInstances);
+ }
+
+ /**
+ * Return the token ranges owned and replicated to the host according to
the replication strategy of the keyspace
+ * The result set is unmodifiable.
+ */
+ @Nullable
+ private Pair<Host, Set<TokenRange>> tokenRangesOfHost(Metadata metadata,
+ String keyspace,
+ InstanceMetadata
instance,
+
Map<IpAddressAndPort, Host> allHosts)
+ {
+ Host host;
+ try
+ {
+ final IpAddressAndPort ip =
IpAddressAndPort.of(dnsResolver.resolve(instance.host()), instance.port());
+ host = allHosts.get(ip);
+ if (host == null)
+ {
+ LOGGER.warn("Could not map InstanceMetadata to Host host={}
port={} ip={}",
+ instance.host(), instance.port(), ip.ipAddress);
+ return null;
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException("Failed to resolve hostname to ip.
hostname: " + instance.host(), e);
+ }
+ return Pair.of(host, tokenRangesOfHost(metadata, keyspace, host));
+ }
+
+ public Set<TokenRange> tokenRangesOfHost(Metadata metadata, String
keyspace, Host host)
+ {
+ return metadata.getTokenRanges(keyspace, host)
+ .stream()
+ .flatMap(range -> TokenRange.from(range).stream())
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Reload the locally cached token ranges when needed
+ */
+ @Nullable
+ private synchronized Map<Integer, Set<TokenRange>>
getCacheOrReload(Metadata metadata,
+ String
keyspace,
+
Set<Integer> localInstanceIds,
+
List<InstanceMetadata> localInstances,
+
Set<Host> allInstances)
+ {
+ // exit early if no change is found
+ boolean isClusterTheSame = allInstances.equals(allInstancesCache)
+ &&
localInstanceIds.equals(localInstanceIdsCache);
+ if (localTokenRangesCache != null
+ && localTokenRangesCache.containsKey(keyspace)
+ && isClusterTheSame)
+ {
+ return localTokenRangesCache.get(keyspace);
+ }
+
+ // otherwise, reload the token ranges
+ localInstanceIdsCache = localInstanceIds;
+ allInstancesCache = allInstances;
+ if (allInstances.isEmpty())
+ {
+ LOGGER.warn("No instances found in client session");
+ }
+ Map<IpAddressAndPort, Host> allHosts = new
HashMap<>(allInstancesCache.size());
+ BiConsumer<InetSocketAddress, Host> putNullSafe = (endpoint, host) -> {
+ if (endpoint != null)
+ {
+ allHosts.put(IpAddressAndPort.of(endpoint), host);
+ }
+ };
+ for (Host host : allInstancesCache)
+ {
+ putNullSafe.accept(host.getSocketAddress(), host);
+ putNullSafe.accept(host.getListenSocketAddress(), host);
+ putNullSafe.accept(host.getBroadcastSocketAddress(), host);
+ }
+
+ ImmutableMap.Builder<String, Map<Integer, Set<TokenRange>>>
perKeyspaceBuilder = ImmutableMap.builder();
+ ImmutableSet.Builder<Host> hostBuilder = ImmutableSet.builder();
+ if (isClusterTheSame && localInstancesCache != null)
+ {
+ hostBuilder.addAll(localInstancesCache);
+ }
+
+ for (KeyspaceMetadata ks : metadata.getKeyspaces())
+ {
+ if (isClusterTheSame && localTokenRangesCache != null &&
localTokenRangesCache.containsKey(ks.getName()))
+ {
+ // we don't need to rebuild if already cached
+ perKeyspaceBuilder.put(ks.getName(),
localTokenRangesCache.get(ks.getName()));
+ }
+ else
+ {
+ ImmutableMap.Builder<Integer, Set<TokenRange>> resultBuilder =
ImmutableMap.builder();
+ for (InstanceMetadata instance : localInstances)
+ {
+ Pair<Host, Set<TokenRange>> pair =
tokenRangesOfHost(metadata, keyspace, instance, allHosts);
+ if (pair != null)
+ {
+ hostBuilder.add(pair.getKey());
+ resultBuilder.put(instance.id(),
Collections.unmodifiableSet(pair.getValue()));
+ }
+ }
+ perKeyspaceBuilder.put(ks.getName(), resultBuilder.build());
+ }
+ }
+ localTokenRangesCache = perKeyspaceBuilder.build();
+ localInstancesCache = hostBuilder.build();
+ if (localInstancesCache.isEmpty())
+ {
+ LOGGER.warn("Unable to determine local instances from client
meta-data!");
+ }
+ return localTokenRangesCache.get(keyspace);
+ }
+
+ private static class IpAddressAndPort
+ {
+ final String ipAddress;
+ final int port;
+
+ static IpAddressAndPort of(@NotNull InetSocketAddress endpoint)
+ {
+ return IpAddressAndPort.of(endpoint.getAddress().getHostAddress(),
+ endpoint.getPort());
+ }
+
+ static IpAddressAndPort of(String ipAddress, int port)
+ {
+ return new IpAddressAndPort(ipAddress, port);
+ }
+
+ IpAddressAndPort(String ipAddress, int port)
+ {
+ this.ipAddress = ipAddress;
+ this.port = port;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ IpAddressAndPort that = (IpAddressAndPort) o;
+ return port == that.port && Objects.equals(ipAddress,
that.ipAddress);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(ipAddress, port);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
new file mode 100644
index 0000000..cef04f6
--- /dev/null
+++
b/src/main/java/org/apache/cassandra/sidecar/locator/LocalTokenRangesProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.locator;
+
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/**
+ * Provides the token ranges of the local Cassandra instance(s)
+ */
+public interface LocalTokenRangesProvider
+{
+ /**
+ * Calculate the token ranges owned and replicated to the local Cassandra
instance(s).
+ * When Sidecar is paired with multiple Cassandra instance, the ranges of
each Cassandra instance is captured
+ * in the form of map, where the key is the instance id and the value is
the ranges of the instance. When Sidecar
+ * is paired with a single Cassandra instance, the result map has a single
entry.
+ *
+ * @param keyspace keyspace to determine replication
+ * @return token ranges of the local Cassandra instances
+ */
+ @Nullable
+ Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace);
+}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
index 98f496b..e805df3 100644
---
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
+++
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
@@ -30,12 +30,17 @@ import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor;
import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
+import org.apache.cassandra.sidecar.locator.CachedLocalTokenRanges;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
+import org.apache.cassandra.sidecar.locator.TokenRange;
import org.apache.cassandra.sidecar.stats.RestoreJobStats;
import org.apache.cassandra.sidecar.tasks.PeriodicTask;
import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor;
@@ -54,6 +59,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
private final RestoreJobDatabaseAccessor restoreJobDatabaseAccessor;
private final RestoreSliceDatabaseAccessor restoreSliceDatabaseAccessor;
private final Provider<RestoreJobManagerGroup>
restoreJobManagerGroupSingleton;
+ private final LocalTokenRangesProvider localTokenRangesProvider;
private final InstanceMetadataFetcher instanceMetadataFetcher;
private final RestoreJobStats stats;
private volatile boolean refreshSignaled = true;
@@ -67,6 +73,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
RestoreJobDatabaseAccessor
restoreJobDatabaseAccessor,
RestoreSliceDatabaseAccessor
restoreSliceDatabaseAccessor,
Provider<RestoreJobManagerGroup>
restoreJobManagerGroupProvider,
+ CachedLocalTokenRanges cachedLocalTokenRanges,
InstanceMetadataFetcher
instanceMetadataFetcher,
RestoreJobStats stats)
{
@@ -75,6 +82,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
restoreJobDatabaseAccessor,
restoreSliceDatabaseAccessor,
restoreJobManagerGroupProvider,
+ cachedLocalTokenRanges,
instanceMetadataFetcher,
stats);
}
@@ -85,6 +93,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
RestoreJobDatabaseAccessor restoreJobDatabaseAccessor,
RestoreSliceDatabaseAccessor
restoreSliceDatabaseAccessor,
Provider<RestoreJobManagerGroup>
restoreJobManagerGroupProvider,
+ LocalTokenRangesProvider cachedLocalTokenRanges,
InstanceMetadataFetcher instanceMetadataFetcher,
RestoreJobStats stats)
{
@@ -94,6 +103,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
this.restoreSliceDatabaseAccessor = restoreSliceDatabaseAccessor;
this.jobDiscoveryRecencyDays =
restoreJobConfig.jobDiscoveryRecencyDays();
this.restoreJobManagerGroupSingleton = restoreJobManagerGroupProvider;
+ this.localTokenRangesProvider = cachedLocalTokenRanges;
this.instanceMetadataFetcher = instanceMetadataFetcher;
this.stats = stats;
}
@@ -153,6 +163,7 @@ public class RestoreJobDiscoverer implements PeriodicTask
switch (job.status)
{
case CREATED:
+ case STAGED:
if (job.expireAt == null // abort all old jobs that
has no expireAt value
|| job.expireAt.getTime() < nowMillis)
{
@@ -164,6 +175,11 @@ public class RestoreJobDiscoverer implements PeriodicTask
// find the oldest non-completed job
days = Math.max(days, delta(today, job.createdAt));
restoreJobManagers.updateRestoreJob(job);
+ if (job.isManagedBySidecar())
+ {
+ // todo: potential exceedingly number of queries
+ findSlicesAndSubmit(job);
+ }
inflightJobsCount += 1;
break;
case FAILED:
@@ -208,7 +224,6 @@ public class RestoreJobDiscoverer implements PeriodicTask
}
/**
- * TODO: remove the method on phase 2 completion
* Signal the job discovery loop to refresh in the next execution
*/
public void signalRefresh()
@@ -216,6 +231,44 @@ public class RestoreJobDiscoverer implements PeriodicTask
refreshSignaled = true;
}
+ // find all slices of the job that should be downloaded to the local
instances,
+ // according to the cluster token ownership
+ private void findSlicesAndSubmit(RestoreJob restoreJob)
+ {
+ localTokenRangesProvider.localTokenRanges(restoreJob.keyspaceName)
+ .forEach((key, ranges) -> {
+ int instanceId = key;
+ InstanceMetadata instance =
instanceMetadataFetcher.instance(instanceId);
+ ranges.forEach(range ->
findSlicesOfRangeAndSubmit(instance, restoreJob, range));
+ });
+ }
+
+ // try to submit the slice.
+ // If it is already exist, it is a no-op.
+ // If the submission fails, the slice status of the instance is updated.
+ private void findSlicesOfRangeAndSubmit(InstanceMetadata instance,
RestoreJob restoreJob, TokenRange range)
+ {
+ short bucketId = 0; // TODO: update the implementation to pick proper
bucketId
+ restoreSliceDatabaseAccessor
+ .selectByJobByBucketByTokenRange(restoreJob.jobId, bucketId,
range.start, range.end)
+ .forEach(slice -> {
+ // set the owner instance, which is not read from database
+ slice = slice.unbuild().ownerInstance(instance).build();
+ try
+ {
+ // todo: do not re-submit for download if the slice is staged
(when job status is before staged)
+ // or imported (when job status is staged) on the
instance already
+ restoreJobManagerGroupSingleton.get().trySubmit(instance,
slice, restoreJob);
+ }
+ catch (RestoreJobFatalException e)
+ {
+ slice.fail(e); // TODO: is it still needed? no, remove it
later.
+ slice.failAtInstance(instance.id());
+ restoreSliceDatabaseAccessor.updateStatus(slice);
+ }
+ });
+ }
+
private boolean abortJob(RestoreJob job)
{
LOGGER.info("Abort expired job. jobId={} job={}", job.jobId, job);
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index 8828c8f..d33ff37 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -61,7 +61,7 @@ public class RestoreJobUtil
*/
public static void unzip(File zipFile, File targetDir) throws IOException,
RestoreJobException
{
- try (ZipInputStream zis = new ZipInputStream(new
FileInputStream(zipFile)))
+ try (ZipInputStream zis = new
ZipInputStream(Files.newInputStream(zipFile.toPath())))
{
ZipEntry zipEntry = zis.getNextEntry();
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
index d48b705..af6332f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
@@ -36,6 +36,7 @@ import
org.apache.cassandra.sidecar.concurrent.ConcurrencyLimiter;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.db.RestoreSlice;
+import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
@@ -58,6 +59,7 @@ public class RestoreProcessor implements PeriodicTask
private final ConcurrencyLimiter processMaxConcurrency;
private final SliceQueue sliceQueue = new SliceQueue();
private final double requiredUsableSpacePercentage; // value range: [0.0,
1.0)
+ private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
private final RestoreJobStats stats;
private volatile boolean isClosed = false; // OK to run close twice, so
relax the control to volatile
@@ -67,6 +69,7 @@ public class RestoreProcessor implements PeriodicTask
SidecarSchema sidecarSchema,
StorageClientPool s3ClientPool,
SSTableImporter importer,
+ RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
RestoreJobStats stats)
{
this.pool = executorPools.internal();
@@ -77,6 +80,7 @@ public class RestoreProcessor implements PeriodicTask
this.requiredUsableSpacePercentage
=
config.serviceConfiguration().ssTableUploadConfiguration().minimumSpacePercentageRequired()
/ 100.0;
this.importer = importer;
+ this.sliceDatabaseAccessor = sliceDatabaseAccessor;
this.stats = stats;
}
@@ -126,12 +130,29 @@ public class RestoreProcessor implements PeriodicTask
// capture the new queue length after polling
sliceQueue.captureImportQueueLength();
pool.executeBlocking(slice.toAsyncTask(s3ClientPool, pool,
importer,
-
requiredUsableSpacePercentage, stats),
+
requiredUsableSpacePercentage,
+ sliceDatabaseAccessor,
stats),
false) // unordered
.onSuccess(restoreSlice -> {
- stats.captureSliceCompletionTime(slice.owner().id(),
System.nanoTime() - slice.creationTimeNanos());
- LOGGER.info("Slice completes successfully. sliceKey={}",
restoreSlice.key());
- restoreSlice.complete();
+ if (slice.hasImported())
+ {
+ stats.captureSliceCompletionTime(slice.owner().id(),
System.nanoTime() - slice.creationTimeNanos());
+ LOGGER.info("Slice completes successfully. sliceKey={}",
slice.key());
+ slice.complete();
+ }
+ else if (slice.hasStaged())
+ {
+ // todo: report stat of time taken to stage
+ LOGGER.info("Slice has been staged successfully.
sliceKey={}", slice.key());
+ // the slice is not fully complete yet. Re-enqueue the
slice.
+ sliceQueue.offer(slice);
+ }
+ else // log a warning and retry. It should not reach here.
+ {
+ LOGGER.warn("Unexpected state of slice. It is neither
staged nor imported. sliceKey={}",
+ slice.key());
+ sliceQueue.offer(slice);
+ }
})
.onFailure(cause -> {
if (cause instanceof RestoreJobException &&
((RestoreJobException) cause).retryable())
@@ -143,8 +164,13 @@ public class RestoreProcessor implements PeriodicTask
else
{
LOGGER.error("Slice failed with unrecoverable failure.
sliceKey={}", slice.key(), cause);
- // fail the slice. In the current implementation, all
slices of the job get aborted
+ // fail the slice and mark the slice has failed on its
owning instance.
+ // In the phase 1 implementation, all slices of the job
get aborted
slice.fail(RestoreJobExceptions.toFatal(cause));
+ if (slice.job().isManagedBySidecar())
+ {
+ sliceDatabaseAccessor.updateStatus(slice);
+ }
// revoke the s3 credentials of the job too
s3ClientPool.revokeCredentials(slice.jobId());
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index b0214d9..0cec6f2 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.sidecar.restore;
import java.io.File;
+import java.nio.file.Files;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -30,10 +31,13 @@ import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreSlice;
+import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
@@ -59,27 +63,31 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
{
private static final Logger LOGGER =
LoggerFactory.getLogger(RestoreSliceTask.class);
- private final RestoreJob job;
private final RestoreSlice slice;
private final StorageClient s3Client;
private final ExecutorPools.TaskExecutorPool executorPool;
private final SSTableImporter importer;
private final double requiredUsableSpacePercentage;
+ private final RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
private final RestoreJobStats stats;
- public RestoreSliceTask(RestoreJob job, RestoreSlice slice,
+ public RestoreSliceTask(RestoreSlice slice,
StorageClient s3Client,
ExecutorPools.TaskExecutorPool executorPool,
SSTableImporter importer,
double requiredUsableSpacePercentage,
+ RestoreSliceDatabaseAccessor sliceDatabaseAccessor,
RestoreJobStats stats)
{
- this.job = job;
+ Preconditions.checkArgument(!slice.job().isManagedBySidecar()
+ || sliceDatabaseAccessor != null,
+ "sliceDatabaseAccessor cannot be null");
this.slice = slice;
this.s3Client = s3Client;
this.executorPool = executorPool;
this.importer = importer;
this.requiredUsableSpacePercentage = requiredUsableSpacePercentage;
+ this.sliceDatabaseAccessor = sliceDatabaseAccessor;
this.stats = stats;
}
@@ -92,11 +100,58 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
// The slice, when being process, requires a total of slice size
(download) + uncompressed (unzip) to use.
// The protection below guards the slice being process, if the usable
disk space falls below the threshold
// after considering the slice
- ensureSufficientStorage(slice.targetPathInStaging().toString(),
+ ensureSufficientStorage(slice.stageDirectory().toString(),
slice.compressedSize() +
slice.uncompressedSize(),
requiredUsableSpacePercentage,
executorPool)
- .onSuccess(ignored -> downloadSliceAndImport(event))
+ .onSuccess(ignored -> {
+ RestoreJob job = slice.job();
+ if (job.isManagedBySidecar())
+ {
+ if (job.status == RestoreJobStatus.CREATED)
+ {
+ if (Files.exists(slice.stagedObjectPath()))
+ {
+ LOGGER.debug("The slice has been staged already.
sliceKey={} stagedFilePath={}",
+ slice.key(), slice.stagedObjectPath());
+ slice.completeStagePhase(); // update the flag if
missed
+ sliceDatabaseAccessor.updateStatus(slice);
+ event.tryComplete(slice);
+ return;
+ }
+
+ // 1. check object existence and validate eTag / checksum
+ checkObjectExistence(event)
+ // 2. download slice/object when the remote object exists
+ .thenCompose(headObject -> downloadSlice(event))
+ // 3. persist status
+ .thenAccept(x -> {
+ slice.completeStagePhase();
+ sliceDatabaseAccessor.updateStatus(slice);
+ // completed staging. A new task is produced when it
comes to import
+ event.tryComplete(slice);
+ });
+ }
+ else if (job.status == RestoreJobStatus.STAGED)
+ {
+ unzipAndImport(event, slice.stagedObjectPath().toFile(),
+ // persist status
+ () ->
sliceDatabaseAccessor.updateStatus(slice));
+ }
+ else
+ {
+ String msg = "Unexpected restore job status. Expected only
CREATED or STAGED when " +
+ "processing active slices. Found status: " +
job.status;
+ Exception unexpectedState = new IllegalStateException(msg);
+
event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected restore job status",
+ slice,
unexpectedState));
+ }
+ }
+ else
+ {
+ downloadSliceAndImport(event);
+ }
+ })
.onFailure(cause -> {
String msg = "Unable to ensure enough space for the slice. Retry
later";
event.tryFail(RestoreJobExceptions.ofSlice(msg, slice, cause));
@@ -184,6 +239,7 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
{
RestoreJobFatalException ex =
RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
slice, null);
+ event.tryFail(ex);
CompletableFuture<File> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(ex);
return failedFuture;
@@ -223,6 +279,11 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
@VisibleForTesting
void unzipAndImport(Promise<RestoreSlice> event, File file)
+ {
+ unzipAndImport(event, file, null);
+ }
+
+ void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable
onSuccessCommit)
{
if (file == null) // the condition should never happen. Having it here
for logic completeness
{
@@ -234,7 +295,21 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
unzip(file)
.compose(this::validateFiles)
.compose(this::commit)
- .onSuccess(x -> event.tryComplete(slice))
+ .compose(x -> {
+ if (onSuccessCommit == null)
+ {
+ return Future.succeededFuture();
+ }
+
+ return executorPool.executeBlocking(promise -> {
+ onSuccessCommit.run();
+ promise.tryComplete();
+ });
+ })
+ .onSuccess(x -> {
+ slice.completeImportPhase();
+ event.tryComplete(slice);
+ })
.onFailure(failure -> {
logWarnIfHasHttpExceptionCauseOnCommit(failure, slice);
event.tryFail(RestoreJobExceptions.propagate("Fail to commit
slice. "
@@ -248,24 +323,33 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
if (failOnCancelled(promise))
return;
- if (!zipFile.exists())
- {
- promise.tryFail(new RestoreJobException("Object not found from
disk. File: " + zipFile));
- return;
- }
-
// targetPathInStaging points to the directory named after uploadId
// SSTableImporter expects the file system structure to be
uploadId/keyspace/table/sstables
- File targetDir = slice.targetPathInStaging()
+ File targetDir = slice.stageDirectory()
.resolve(slice.keyspace())
.resolve(slice.table())
.toFile();
- if (!targetDir.mkdirs())
+
+ boolean targetDirExist = targetDir.isDirectory();
+
+ if (!zipFile.exists())
{
- LOGGER.warn("Error occurred while creating directory for
holding SSTables for SSTableImporter");
+ if (targetDirExist)
+ {
+ LOGGER.debug("The files in slice are already extracted.
Maybe it is a retried task?");
+ promise.complete(targetDir);
+ }
+ else
+ {
+ promise.tryFail(new RestoreJobException("Object not found
from disk. File: " + zipFile));
+ }
+ // return early
+ return;
}
+
try
{
+ Files.createDirectories(targetDir.toPath());
// Remove all existing files under the target directory
// The validation step later expects only the files registered
in the manifest.
RestoreJobUtil.cleanDirectory(targetDir.toPath());
@@ -275,7 +359,7 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
// Then, delete the downloaded zip file
if (!zipFile.delete())
{
- LOGGER.warn("Error while deleting file {}, please note for
space wastage",
+ LOGGER.warn("File deletion attempt failed. file={}",
zipFile.getAbsolutePath());
}
}
@@ -383,7 +467,7 @@ public class RestoreSliceTask implements
Handler<Promise<RestoreSlice>>
LOGGER.info("Begin committing SSTables. sliceKey={}", slice.key());
- SSTableImportOptions options = job.importOptions;
+ SSTableImportOptions options = slice.job().importOptions;
SSTableImporter.ImportOptions importOptions = new
SSTableImporter.ImportOptions.Builder()
.host(slice.owner().host())
.keyspace(slice.keyspace())
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index 621c1dc..f7825f6 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -152,7 +152,7 @@ public class StorageClient
.bucket(slice.bucket())
.key(slice.key())
.build();
- Path objectPath = slice.targetPathInStaging().resolve(slice.key());
+ Path objectPath = slice.stagedObjectPath();
File object = objectPath.toFile();
if (object.exists())
{
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
index 1c190d9..28ecd8b 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/AbortRestoreJobHandler.java
@@ -81,9 +81,11 @@ public class AbortRestoreJobHandler extends
AbstractHandler<String>
restoreJobDatabaseAccessor.abort(job.jobId);
restoreJobManagerGroup.signalRefreshRestoreJob();
- return Future.succeededFuture();
+ return Future.succeededFuture(job);
})
- .onSuccess(ignored -> {
+ .onSuccess(job -> {
+ logger.info("Successfully aborted restore job. job={},
remoteAddress={}, instance={}",
+ job, remoteAddress, host);
stats.captureFailedJob();
context.response().setStatusCode(HttpResponseStatus.OK.code()).end();
})
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java
index d6a36fb..5820a9b 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreJobHandler.java
@@ -86,7 +86,7 @@ public class CreateRestoreJobHandler extends
AbstractHandler<CreateRestoreJobReq
@Override
protected CreateRestoreJobRequestPayload
extractParamsOrThrow(RoutingContext context)
{
- String bodyString = context.getBodyAsString();
+ String bodyString = context.body().asString();
if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json
encoder writes null as "null"
{
logger.warn("Bad request to create restore job. Received null
payload.");
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java
index e7c005a..7269ef6 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/CreateRestoreSliceHandler.java
@@ -98,7 +98,7 @@ public class CreateRestoreSliceHandler extends
AbstractHandler<CreateSliceReques
.qualifiedTableName(tableName)
.createSliceRequestPayload(request)
.ownerInstance(instance)
-
.targetPathInStaging(Paths.get(instance.stagingDir()), uploadId)
+
.stageDirectory(Paths.get(instance.stagingDir()), uploadId)
.replicaStatus(Collections.singletonMap(String.valueOf(instance.id()),
RestoreSliceStatus.COMMITTING))
.replicas(Collections.singleton(String.valueOf(instance.id())))
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java
index 6f8add6..9e2a5e7 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandler.java
@@ -30,7 +30,6 @@ import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext;
-import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
import org.apache.cassandra.sidecar.common.data.UpdateRestoreJobRequestPayload;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
@@ -77,10 +76,9 @@ public class UpdateRestoreJobHandler extends
AbstractHandler<UpdateRestoreJobReq
SocketAddress remoteAddress,
UpdateRestoreJobRequestPayload
requestPayload)
{
- QualifiedTableName qualifiedTableName = qualifiedTableName(context);
-
- RoutingContextUtils.getAsFuture(context, SC_RESTORE_JOB)
- .compose(job -> {
+ RoutingContextUtils
+ .getAsFuture(context, SC_RESTORE_JOB)
+ .compose(job -> {
if (RestoreJobStatus.isFinalState(job.status))
{
// skip the update, since the job is in the final state already
@@ -91,12 +89,11 @@ public class UpdateRestoreJobHandler extends
AbstractHandler<UpdateRestoreJobReq
return executorPools.service().<RestoreJob>executeBlocking(promise
-> {
promise.complete(restoreJobDatabaseAccessor.update(requestPayload,
-
qualifiedTableName,
job.jobId));
});
})
- .onSuccess(job -> {
- logger.info("Successfully update restore job. job={}, request={},
remoteAddress={}, instance={}",
+ .onSuccess(job -> {
+ logger.info("Successfully updated restore job. job={}, request={},
remoteAddress={}, instance={}",
job, requestPayload, remoteAddress, host);
if (job.status == RestoreJobStatus.SUCCEEDED)
{
@@ -115,13 +112,13 @@ public class UpdateRestoreJobHandler extends
AbstractHandler<UpdateRestoreJobReq
restoreJobManagerGroup.signalRefreshRestoreJob();
context.response().setStatusCode(HttpResponseStatus.OK.code()).end();
})
- .onFailure(cause -> processFailure(cause, context,
host, remoteAddress, requestPayload));
+ .onFailure(cause -> processFailure(cause, context, host,
remoteAddress, requestPayload));
}
@Override
protected UpdateRestoreJobRequestPayload
extractParamsOrThrow(RoutingContext context)
{
- String bodyString = context.getBodyAsString();
+ String bodyString = context.body().asString();
if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json
encoder writes null as "null"
{
logger.warn("Bad request to update restore job. Received null
payload.");
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
b/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
index abe6673..a1b4442 100644
---
a/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
+++
b/src/test/integration/org/apache/cassandra/sidecar/db/RestoreJobsDatabaseAccessorIntTest.java
@@ -58,11 +58,11 @@ class RestoreJobsDatabaseAccessorIntTest extends
IntegrationTestBase
RestoreJobSecrets secrets =
RestoreJobSecretsGen.genRestoreJobSecrets();
long expiresAtMillis = System.currentTimeMillis() +
TimeUnit.HOURS.toMillis(1);
UUID jobId = UUIDs.timeBased();
- accessor.create(CreateRestoreJobRequestPayload.builder(secrets,
expiresAtMillis)
- .jobId(jobId)
- .jobAgent("agent")
- .build(),
- qualifiedTableName);
+ CreateRestoreJobRequestPayload payload =
CreateRestoreJobRequestPayload.builder(secrets, expiresAtMillis)
+
.jobId(jobId)
+
.jobAgent("agent")
+
.build();
+ accessor.create(payload, qualifiedTableName);
List<RestoreJob> foundJobs = accessor.findAllRecent(3);
assertThat(foundJobs).hasSize(1);
@@ -70,7 +70,7 @@ class RestoreJobsDatabaseAccessorIntTest extends
IntegrationTestBase
assertJob(accessor.find(jobId), jobId, RestoreJobStatus.CREATED,
expiresAtMillis, secrets);
UpdateRestoreJobRequestPayload markSucceeded
= new UpdateRestoreJobRequestPayload(null, null,
RestoreJobStatus.SUCCEEDED, null);
- accessor.update(markSucceeded, qualifiedTableName, jobId);
+ accessor.update(markSucceeded, jobId);
assertJob(accessor.find(jobId), jobId, RestoreJobStatus.SUCCEEDED,
expiresAtMillis, secrets);
}
diff --git
a/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
b/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
index 9d4e0d5..c1c0187 100644
---
a/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
+++
b/src/test/integration/org/apache/cassandra/testing/ConfigurableCassandraTestContext.java
@@ -19,9 +19,16 @@
package org.apache.cassandra.testing;
import java.io.IOException;
+import java.net.BindException;
import java.util.function.Consumer;
+import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.utils.Throwables;
/**
* A Cassandra Test Context implementation that allows advanced cluster
configuration before cluster creation
@@ -29,6 +36,8 @@ import org.apache.cassandra.distributed.UpgradeableCluster;
*/
public class ConfigurableCassandraTestContext extends
AbstractCassandraTestContext
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigurableCassandraTestContext.class);
+
public static final String BUILT_CLUSTER_CANNOT_BE_CONFIGURED_ERROR =
"Cannot configure a cluster after it is built. Please set the buildCluster
annotation attribute to false, "
+ "and do not call `getCluster` before calling this method.";
@@ -57,9 +66,31 @@ public class ConfigurableCassandraTestContext extends
AbstractCassandraTestConte
public UpgradeableCluster
configureAndStartCluster(Consumer<UpgradeableCluster.Builder> configurator)
throws IOException
{
- cluster = configureCluster(configurator);
- cluster.startup();
- return cluster;
+ RuntimeException exception = null;
+ for (int i = 0; i < 3; i++)
+ {
+ try
+ {
+ cluster = configureCluster(configurator);
+ cluster.startup();
+ return cluster;
+ }
+ catch (RuntimeException ret)
+ {
+ exception = ret;
+ boolean addressAlreadyInUse =
Throwables.anyCauseMatches(exception, this::portNotAvailableToBind);
+ if (addressAlreadyInUse)
+ {
+ LOGGER.warn("Failed to provision cluster after {}
retries", i, exception);
+ }
+ else
+ {
+ throw exception;
+ }
+
+ }
+ }
+ throw exception;
}
@Override
@@ -70,4 +101,10 @@ public class ConfigurableCassandraTestContext extends
AbstractCassandraTestConte
+ ", builder=" + builder
+ '}';
}
+
+ private boolean portNotAvailableToBind(Throwable ex)
+ {
+ return ex instanceof BindException &&
+ StringUtils.contains(ex.getMessage(), "Address already in use");
+ }
}
diff --git a/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java
b/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java
index b5d4940..eb20157 100644
--- a/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/db/RestoreJobTest.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import org.junit.jupiter.api.Test;
import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
@@ -51,6 +52,21 @@ public class RestoreJobTest
return builder.build();
}
+ public static RestoreJob createUpdatedJob(UUID jobId, String jobAgent,
+ RestoreJobStatus status,
+ RestoreJobSecrets secrets,
+ Date expireAt)
+ throws DataObjectMappingException
+ {
+ RestoreJob.Builder builder = RestoreJob.builder();
+ builder.createdAt(RestoreJob.toLocalDate(jobId))
+ .jobId(jobId).jobAgent(jobAgent)
+ .jobStatus(status)
+ .jobSecrets(secrets)
+ .expireAt(expireAt);
+ return builder.build();
+ }
+
@Test
void testDefaultImportOptionsWhenNotSetInDb()
{
diff --git
a/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
b/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
index 86db504..b5c93de 100644
--- a/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
@@ -19,11 +19,15 @@
package org.apache.cassandra.sidecar.db;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -116,7 +120,9 @@ public class SidecarSchemaTest
sidecarSchema.startSidecarSchemaInitializer();
context.verify(() -> {
int maxWaitTime = 20; // about 10 seconds
- while (interceptedExecStmts.size() < 1 ||
!sidecarSchema.isInitialized())
+ while (interceptedPrepStmts.size() < 10
+ || interceptedExecStmts.size() < 3
+ || !sidecarSchema.isInitialized())
{
if (maxWaitTime-- <= 0)
{
@@ -129,6 +135,51 @@ public class SidecarSchemaTest
assertEquals(3, interceptedExecStmts.size());
assertTrue(interceptedExecStmts.get(0).contains("CREATE KEYSPACE
IF NOT EXISTS sidecar_internal"),
"Create keyspace should be executed the first");
+ assertTrue(hasElementContains(interceptedExecStmts,
+ "CREATE TABLE IF NOT EXISTS
sidecar_internal.restore_job_v2"),
+ "Create table should be executed the next for job
table");
+ assertTrue(hasElementContains(interceptedExecStmts,
+ "CREATE TABLE IF NOT EXISTS
sidecar_internal.restore_slice_v2"),
+ "Create table should be executed the next for slice
table");
+
+ List<String> expectedPrepStatements = Arrays.asList(
+ "INSERT INTO sidecar_internal.restore_job_v2 ( created_at,
job_id, keyspace_name, table_name, " +
+ "job_agent, status, blob_secrets, import_options,
consistency_level, expire_at) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+
+ "INSERT INTO sidecar_internal.restore_job_v2 ( created_at,
job_id, blob_secrets) VALUES (?, ? ,?)",
+
+ "INSERT INTO sidecar_internal.restore_job_v2 ( created_at,
job_id, status) VALUES (?, ?, ?)",
+
+ "INSERT INTO sidecar_internal.restore_job_v2 ( created_at,
job_id, job_agent) VALUES (?, ?, ?)",
+
+ "INSERT INTO sidecar_internal.restore_job_v2 ( created_at,
job_id, expire_at) VALUES (?, ?, ?)",
+
+ "SELECT created_at, job_id, keyspace_name, table_name, job_agent,
status, blob_secrets, import_options, " +
+ "consistency_level, expire_at FROM sidecar_internal.restore_job_v2
WHERE created_at = ? AND job_id = ?",
+
+ "SELECT created_at, job_id, keyspace_name, table_name, job_agent,
status, blob_secrets, import_options, " +
+ "consistency_level, expire_at FROM sidecar_internal.restore_job_v2
WHERE created_at = ?",
+
+ "INSERT INTO sidecar_internal.restore_slice_v2 ( job_id,
bucket_id, slice_id, bucket, key, " +
+ "checksum, start_token, end_token, compressed_size,
uncompressed_size, status_by_replica, " +
+ "all_replicas) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+
+ "SELECT job_id, bucket_id, slice_id, bucket, key, checksum,
start_token, end_token, compressed_size, " +
+ "uncompressed_size, status_by_replica, all_replicas FROM
sidecar_internal.restore_slice_v2 " +
+ "WHERE job_id = ? AND bucket_id = ? AND end_token >= ? AND
start_token < ? ALLOW FILTERING",
+
+ "UPDATE sidecar_internal.restore_slice_v2 SET status_by_replica =
status_by_replica + ?, " +
+ "all_replicas = all_replicas + ? WHERE job_id = ? AND bucket_id =
? AND start_token = ? AND slice_id = ?"
+ );
+
+ Set<String> expected = new HashSet<>(expectedPrepStatements);
+ Set<String> actual = new HashSet<>(interceptedPrepStmts);
+ Set<String> notInExpected = Sets.difference(actual, expected);
+ assertEquals(expected.size(), actual.size(), "Number of prepared
statements should match");
+ assertTrue(notInExpected.isEmpty(),
+ "Found the following statements that not in expected: "
+ notInExpected);
+
assertTrue(sidecarSchema.isInitialized());
context.completeNow();
});
diff --git
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
index 8d5abee..6efc192 100644
---
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
@@ -38,13 +38,15 @@ import
org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
import org.apache.cassandra.sidecar.db.RestoreJob;
import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor;
-import org.apache.cassandra.sidecar.db.RestoreJobTest;
import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.locator.LocalTokenRangesProvider;
import org.apache.cassandra.sidecar.stats.TestRestoreJobStats;
import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor;
import org.mockito.ArgumentCaptor;
+import static
org.apache.cassandra.sidecar.db.RestoreJobTest.createNewTestingJob;
+import static org.apache.cassandra.sidecar.db.RestoreJobTest.createUpdatedJob;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doNothing;
@@ -57,18 +59,20 @@ class RestoreJobDiscovererTest
private static final long idleLoopDelay = 2000;
private static final int recencyDays = 10;
private final TestRestoreJobStats stats = new TestRestoreJobStats();
- private RestoreJobDatabaseAccessor mockJobAccessor =
mock(RestoreJobDatabaseAccessor.class);
- private RestoreSliceDatabaseAccessor mockSliceAccessor =
mock(RestoreSliceDatabaseAccessor.class);
- private RestoreJobManagerGroup mockManagers =
mock(RestoreJobManagerGroup.class);
- private PeriodicTaskExecutor executor = mock(PeriodicTaskExecutor.class);
- private SidecarSchema sidecarSchema = mock(SidecarSchema.class);
- private RestoreJobDiscoverer loop = new RestoreJobDiscoverer(testConfig(),
- sidecarSchema,
-
mockJobAccessor,
-
mockSliceAccessor,
- () ->
mockManagers,
- null,
- stats);
+ private final RestoreJobDatabaseAccessor mockJobAccessor =
mock(RestoreJobDatabaseAccessor.class);
+ private final RestoreSliceDatabaseAccessor mockSliceAccessor =
mock(RestoreSliceDatabaseAccessor.class);
+ private final RestoreJobManagerGroup mockManagers =
mock(RestoreJobManagerGroup.class);
+ private final LocalTokenRangesProvider rangesProvider =
mock(LocalTokenRangesProvider.class);
+ private final PeriodicTaskExecutor executor =
mock(PeriodicTaskExecutor.class);
+ private final SidecarSchema sidecarSchema = mock(SidecarSchema.class);
+ private final RestoreJobDiscoverer loop = new
RestoreJobDiscoverer(testConfig(),
+
sidecarSchema,
+
mockJobAccessor,
+
mockSliceAccessor,
+ () ->
mockManagers,
+
rangesProvider,
+ null,
+ stats);
@Test
void testGetDelay()
@@ -78,9 +82,13 @@ class RestoreJobDiscovererTest
// when there is active restore job (status: CREATED)
UUID jobId = UUIDs.timeBased();
when(mockJobAccessor.findAllRecent(anyInt()))
- .thenReturn(Collections.singletonList(RestoreJob.forUpdates(jobId,
"agent",
-
RestoreJobStatus.CREATED, null,
- new
Date(System.currentTimeMillis() + 10000L))));
+ .thenReturn(Collections.singletonList(RestoreJob.builder()
+
.createdAt(RestoreJob.toLocalDate(jobId))
+ .jobId(jobId)
+ .jobAgent("agent")
+
.jobStatus(RestoreJobStatus.CREATED)
+ .expireAt(new
Date(System.currentTimeMillis() + 10000L))
+ .build()));
loop.registerPeriodicTaskExecutor(executor);
executeBlocking();
assertThat(stats.activeJobCount).describedAs("active jobs count is
updated")
@@ -88,9 +96,13 @@ class RestoreJobDiscovererTest
assertThat(loop.delay()).isEqualTo(activeLoopDelay);
// when no more jobs are active, the delay is reset back to idle loop
delay accordingly.
when(mockJobAccessor.findAllRecent(anyInt()))
- .thenReturn(Collections.singletonList(RestoreJob.forUpdates(jobId,
"agent",
-
RestoreJobStatus.SUCCEEDED, null,
- new
Date(System.currentTimeMillis() + 10000L))));
+ .thenReturn(Collections.singletonList(RestoreJob.builder()
+
.createdAt(RestoreJob.toLocalDate(jobId))
+ .jobId(jobId)
+ .jobAgent("agent")
+
.jobStatus(RestoreJobStatus.SUCCEEDED)
+ .expireAt(new
Date(System.currentTimeMillis() + 10000L))
+ .build()));
executeBlocking();
assertThat(stats.activeJobCount).describedAs("active jobs count is
updated")
.isZero();
@@ -112,11 +124,11 @@ class RestoreJobDiscovererTest
UUID newJobId = UUIDs.timeBased();
UUID failedJobId = UUIDs.timeBased();
UUID succeededJobId = UUIDs.timeBased();
- mockResult.add(RestoreJobTest.createNewTestingJob(newJobId));
- mockResult.add(RestoreJob.forUpdates(failedJobId, "agent",
RestoreJobStatus.ABORTED, null,
- new
Date(System.currentTimeMillis() + 10000L)));
- mockResult.add(RestoreJob.forUpdates(succeededJobId, "agent",
RestoreJobStatus.SUCCEEDED, null,
- new
Date(System.currentTimeMillis() + 10000L)));
+ mockResult.add(createNewTestingJob(newJobId));
+ mockResult.add(createUpdatedJob(failedJobId, "agent",
RestoreJobStatus.ABORTED, null,
+ new Date(System.currentTimeMillis() +
10000L)));
+ mockResult.add(createUpdatedJob(succeededJobId, "agent",
RestoreJobStatus.SUCCEEDED, null,
+ new Date(System.currentTimeMillis() +
10000L)));
ArgumentCaptor<UUID> jobIdCapture =
ArgumentCaptor.forClass(UUID.class);
doNothing().when(mockManagers).removeJobInternal(jobIdCapture.capture());
when(mockJobAccessor.findAllRecent(anyInt())).thenReturn(mockResult);
@@ -144,8 +156,8 @@ class RestoreJobDiscovererTest
// Execution 2
when(mockJobAccessor.findAllRecent(anyInt()))
- .thenReturn(Collections.singletonList(RestoreJob.forUpdates(newJobId,
"agent",
- RestoreJobStatus.SUCCEEDED,
null, new Date())));
+ .thenReturn(Collections.singletonList(createUpdatedJob(newJobId,
"agent",
+
RestoreJobStatus.SUCCEEDED, null, new Date())));
executeBlocking();
assertThat(stats.activeJobCount).isZero();
@@ -164,7 +176,7 @@ class RestoreJobDiscovererTest
loop.signalRefresh();
UUID newJobId2 = UUIDs.timeBased();
when(mockJobAccessor.findAllRecent(anyInt()))
-
.thenReturn(Collections.singletonList(RestoreJobTest.createNewTestingJob(newJobId2)));
+ .thenReturn(Collections.singletonList(createNewTestingJob(newJobId2)));
assertThat(loop.isRefreshSignaled()).isTrue();
executeBlocking();
@@ -179,8 +191,8 @@ class RestoreJobDiscovererTest
// Execution 5
when(mockJobAccessor.findAllRecent(anyInt()))
- .thenReturn(Collections.singletonList(RestoreJob.forUpdates(newJobId2,
"agent",
-
RestoreJobStatus.ABORTED, null, new Date())));
+ .thenReturn(Collections.singletonList(createUpdatedJob(newJobId2,
"agent",
+
RestoreJobStatus.ABORTED, null, new Date())));
executeBlocking();
assertThat(stats.activeJobCount).isZero();
@@ -200,16 +212,16 @@ class RestoreJobDiscovererTest
}
@Test
- void testExecuteWithExpiredJobs() throws Exception
+ void testExecuteWithExpiredJobs()
{
// setup: all 3 jobs are expired. All of them should be aborted via
mockJobAccessor
when(sidecarSchema.isInitialized()).thenReturn(true);
- List<RestoreJob> mockResult
- = IntStream.range(0, 3)
- .boxed()
- .map(x -> RestoreJob.forUpdates(UUIDs.timeBased(), "agent",
RestoreJobStatus.CREATED, null,
- new
Date(System.currentTimeMillis() - 1000L)))
- .collect(Collectors.toList());
+ List<RestoreJob> mockResult = IntStream.range(0, 3)
+ .boxed()
+ .map(x ->
createUpdatedJob(UUIDs.timeBased(), "agent",
+
RestoreJobStatus.CREATED, null,
+ new
Date(System.currentTimeMillis() - 1000L)))
+ .collect(Collectors.toList());
ArgumentCaptor<UUID> abortedJobs = ArgumentCaptor.forClass(UUID.class);
doNothing().when(mockJobAccessor).abort(abortedJobs.capture());
when(mockJobAccessor.findAllRecent(anyInt())).thenReturn(mockResult);
diff --git
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
index 36556a2..3d20810 100644
---
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobManagerTest.java
@@ -245,11 +245,16 @@ class RestoreJobManagerTest
private RestoreSlice getTestSlice(RestoreJob job)
{
+ InstanceMetadata owner = mock(InstanceMetadata.class);
+ when(owner.id()).thenReturn(1);
RestoreSlice slice = RestoreSlice
.builder()
.jobId(job.jobId)
.bucketId((short) 0)
- .targetPathInStaging(testDir, "uploadId")
+ .stageDirectory(testDir, "uploadId")
+ .storageKey("storageKey")
+ .storageBucket("storageBucket")
+ .ownerInstance(owner)
.replicaStatus(Collections.emptyMap())
.replicas(Collections.emptySet())
.build();
diff --git
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
index f7761c5..83ddc70 100644
---
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreProcessorTest.java
@@ -175,10 +175,11 @@ class RestoreProcessorTest
RestoreSlice slice = mock(RestoreSlice.class,
Mockito.RETURNS_DEEP_STUBS);
when(slice.jobId()).thenReturn(UUIDs.timeBased());
when(slice.owner().id()).thenReturn(1);
- when(slice.toAsyncTask(any(), any(), any(), anyDouble(),
any())).thenReturn(promise -> {
+ when(slice.toAsyncTask(any(), any(), any(), anyDouble(), any(),
any())).thenReturn(promise -> {
Uninterruptibles.awaitUninterruptibly(latch);
promise.complete(slice);
});
+ when(slice.hasImported()).thenReturn(true);
return slice;
}
}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
index 8353f4f..9598ad2 100644
---
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTaskTest.java
@@ -19,12 +19,15 @@
package org.apache.cassandra.sidecar.restore;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import com.datastax.driver.core.utils.UUIDs;
import com.google.inject.Guice;
@@ -36,7 +39,9 @@ import
org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools.TaskExecutorPool;
import org.apache.cassandra.sidecar.db.RestoreJob;
+import org.apache.cassandra.sidecar.db.RestoreJobTest;
import org.apache.cassandra.sidecar.db.RestoreSlice;
+import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
import org.apache.cassandra.sidecar.exceptions.RestoreJobException;
import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
import org.apache.cassandra.sidecar.server.MainModule;
@@ -48,33 +53,30 @@ import
software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import static org.apache.cassandra.sidecar.AssertionUtils.getBlocking;
-import static org.apache.cassandra.sidecar.db.RestoreJob.toLocalDate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class RestoreSliceTaskTest
{
- private RestoreJob restoreJob;
private RestoreSlice restoreSlice;
private StorageClient storageClient;
private TaskExecutorPool executorPool;
private SSTableImporter importer;
private TestRestoreJobStats stats;
private RestoreSliceTask task;
+ private RestoreSliceDatabaseAccessor sliceDatabaseAccessor;
@BeforeEach
void setup()
{
- UUID jobId = UUIDs.timeBased();
- restoreJob = RestoreJob.builder()
- .jobId(jobId)
- .createdAt(toLocalDate(jobId))
- .jobStatus(RestoreJobStatus.CREATED)
- .build();
restoreSlice = mock(RestoreSlice.class, Mockito.RETURNS_DEEP_STUBS);
- when(restoreSlice.targetPathInStaging()).thenReturn(Paths.get("."));
+ when(restoreSlice.stageDirectory()).thenReturn(Paths.get("."));
when(restoreSlice.sliceId()).thenReturn("testing-slice");
when(restoreSlice.key()).thenReturn("storage-key");
when(restoreSlice.owner().id()).thenReturn(1);
@@ -83,8 +85,10 @@ class RestoreSliceTaskTest
Injector injector = Guice.createInjector(Modules.override(new
MainModule()).with(new TestModule()));
executorPool = injector.getInstance(ExecutorPools.class).internal();
stats = new TestRestoreJobStats();
- task = new TestRestoreSliceTask(restoreJob, restoreSlice,
storageClient,
- executorPool, importer, 0, stats);
+ sliceDatabaseAccessor = mock(RestoreSliceDatabaseAccessor.class);
+ task = new TestRestoreSliceTask(restoreSlice, storageClient,
+ executorPool, importer, 0,
+ sliceDatabaseAccessor, stats);
}
@Test
@@ -155,26 +159,101 @@ class RestoreSliceTaskTest
.hasMessageContaining("Object not found");
}
+ @Test
+ void testSliceStaging()
+ {
+ // test specific setup
+ RestoreJob job =
spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(),
RestoreJobStatus.CREATED));
+ doReturn(true).when(job).isManagedBySidecar();
+ when(restoreSlice.job()).thenReturn(job);
+
when(restoreSlice.stagedObjectPath()).thenReturn(Paths.get("nonexist"));
+
when(storageClient.objectExists(restoreSlice)).thenReturn(CompletableFuture.completedFuture(null));
+ when(storageClient.downloadObjectIfAbsent(restoreSlice))
+ .thenReturn(CompletableFuture.completedFuture(new File(".")));
+
+ Promise<RestoreSlice> promise = Promise.promise();
+ task.handle(promise);
+ getBlocking(promise.future()); // no error is thrown
+
+ verify(restoreSlice, times(1)).completeStagePhase();
+ verify(restoreSlice, times(0)).completeImportPhase(); // should not be
called in this phase
+ verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice);
+ }
+
+ @Test
+ void testSliceStagingWithExistingObject(@TempDir Path testFolder) throws
IOException
+ {
+ // test specific setup
+ RestoreJob job =
spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(),
RestoreJobStatus.CREATED));
+ doReturn(true).when(job).isManagedBySidecar();
+ when(restoreSlice.job()).thenReturn(job);
+ Path stagedPath = testFolder.resolve("slice.zip");
+ Files.createFile(stagedPath);
+ when(restoreSlice.stagedObjectPath()).thenReturn(stagedPath);
+ when(storageClient.objectExists(restoreSlice))
+ .thenThrow(new RuntimeException("Should not call this method"));
+ when(storageClient.downloadObjectIfAbsent(restoreSlice))
+ .thenThrow(new RuntimeException("Should not call this method"));
+
+ Promise<RestoreSlice> promise = Promise.promise();
+ task.handle(promise);
+ getBlocking(promise.future()); // no error is thrown
+
+ verify(restoreSlice, times(1)).completeStagePhase();
+ verify(restoreSlice, times(0)).completeImportPhase(); // should not be
called in this phase
+ verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice);
+ }
+
+ @Test
+ void testSliceImport()
+ {
+ // test specific setup
+ RestoreJob job =
spy(RestoreJobTest.createTestingJob(UUIDs.timeBased(),
RestoreJobStatus.STAGED));
+ doReturn(true).when(job).isManagedBySidecar();
+ when(restoreSlice.job()).thenReturn(job);
+
+ Promise<RestoreSlice> promise = Promise.promise();
+ task.handle(promise);
+ getBlocking(promise.future()); // no error is thrown
+
+ verify(restoreSlice, times(0)).completeStagePhase(); // should not be
called in the phase
+ verify(restoreSlice, times(1)).completeImportPhase();
+ verify(sliceDatabaseAccessor, times(1)).updateStatus(restoreSlice);
+ }
+
+
static class TestRestoreSliceTask extends RestoreSliceTask
{
private final RestoreSlice slice;
private final RestoreJobStats stats;
- public TestRestoreSliceTask(RestoreJob job, RestoreSlice slice,
StorageClient s3Client,
- TaskExecutorPool executorPool,
SSTableImporter importer,
- double requiredUsableSpacePercentage,
RestoreJobStats stats)
+ public TestRestoreSliceTask(RestoreSlice slice, StorageClient
s3Client, TaskExecutorPool executorPool,
+ SSTableImporter importer, double
requiredUsableSpacePercentage,
+ RestoreSliceDatabaseAccessor
sliceDatabaseAccessor, RestoreJobStats stats)
{
- super(job, slice, s3Client, executorPool, importer,
requiredUsableSpacePercentage, stats);
+ super(slice, s3Client, executorPool, importer,
requiredUsableSpacePercentage, sliceDatabaseAccessor, stats);
this.slice = slice;
this.stats = stats;
}
- void unzipAndImport(Promise<RestoreSlice> event, File file)
+ @Override
+ void unzipAndImport(Promise<RestoreSlice> event, File file, Runnable
onSuccessCommit)
{
stats.captureSliceUnzipTime(1, 123L);
stats.captureSliceValidationTime(1, 123L);
stats.captureSliceImportTime(1, 123L);
+ slice.completeImportPhase();
event.tryComplete(slice);
+ if (onSuccessCommit != null)
+ {
+ onSuccessCommit.run();
+ }
+ }
+
+ @Override
+ void unzipAndImport(Promise<RestoreSlice> event, File file)
+ {
+ unzipAndImport(event, file, null);
}
}
}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java
index 7ef7e48..52f19fd 100644
--- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreSliceTest.java
@@ -45,7 +45,7 @@ class RestoreSliceTest
.startToken(BigInteger.ONE).endToken(BigInteger.valueOf(2))
.replicaStatus(Collections.singletonMap("replica1",
RestoreSliceStatus.COMMITTING))
.replicas(Collections.singleton("replica1"))
- .targetPathInStaging(path, "uploadId")
+ .stageDirectory(path, "uploadId")
.build();
RestoreSlice slice2 = slice1.unbuild().build();
assertThat(slice1).isEqualTo(slice2);
diff --git
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
index 2453a96..7047122 100644
---
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
+++
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/BaseRestoreJobTests.java
@@ -184,7 +184,6 @@ public abstract class BaseRestoreJobTests
@Override
public RestoreJob update(UpdateRestoreJobRequestPayload payload,
- QualifiedTableName qualifiedTableName,
UUID jobId)
{
return updateFunc.apply(payload);
diff --git
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java
index c62d4ae..24fd12f 100644
---
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobSummaryHandlerTest.java
@@ -52,10 +52,14 @@ class RestoreJobSummaryHandlerTest extends
BaseRestoreJobTests
mockLookupRestoreJob(x -> {
UUID id = UUID.fromString(jobId);
// keyspace name is different
-
- return
RestoreJob.create(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(id)), id,
- "ks", "table", "job agent",
- RestoreJobStatus.CREATED, SECRETS,
SSTableImportOptions.defaults());
+ return RestoreJob.builder()
+
.createdAt(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(id)))
+ .jobId(id).jobAgent("job agent")
+ .keyspace("ks").table("table")
+ .jobStatus(RestoreJobStatus.CREATED)
+ .jobSecrets(SECRETS)
+
.sstableImportOptions(SSTableImportOptions.defaults())
+ .build();
});
sendGetRestoreJobSummaryRequestAndVerify("ks", "table", jobId,
context, HttpResponseStatus.OK.code());
}
@@ -80,8 +84,12 @@ class RestoreJobSummaryHandlerTest extends
BaseRestoreJobTests
String jobId = "7cd82ff9-d276-11ed-93e5-7fce0df1306f";
mockLookupRestoreJob(x -> {
// keyspace name is different
- return RestoreJob.create(null, UUID.fromString(jobId), "ks",
- "table", null, RestoreJobStatus.CREATED,
null, null);
+ return RestoreJob.builder()
+ .createdAt(null)
+ .jobId(UUID.fromString(jobId))
+ .keyspace("ks").table("table")
+ .jobStatus(RestoreJobStatus.CREATED)
+ .build();
});
sendGetRestoreJobSummaryRequestAndVerify("ks1", "table",
"7cd82ff9-d276-11ed-93e5-7fce0df1306f",
context,
HttpResponseStatus.NOT_FOUND.code());
@@ -111,9 +119,12 @@ class RestoreJobSummaryHandlerTest extends
BaseRestoreJobTests
{
mockLookupRestoreJob(x -> {
UUID jobId =
UUID.fromString("7cd82ff9-d276-11ed-93e5-7fce0df1306f");
- return
RestoreJob.create(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(jobId)),
jobId,
- "ks", "table", "job agent",
- RestoreJobStatus.CREATED, null, null);
+ return RestoreJob.builder()
+
.createdAt(LocalDate.fromMillisSinceEpoch(UUIDs.unixTimestamp(jobId)))
+ .jobId(jobId).jobAgent("job agent")
+ .keyspace("ks").table("table")
+ .jobStatus(RestoreJobStatus.CREATED)
+ .build();
});
sendGetRestoreJobSummaryRequestAndVerify("ks", "table",
"7cd82ff9-d276-11ed-93e5-7fce0df1306f",
context,
HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
diff --git
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java
index c0bbe26..0a3cf93 100644
---
a/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/routes/restore/UpdateRestoreJobHandlerTest.java
@@ -143,8 +143,14 @@ class UpdateRestoreJobHandlerTest extends
BaseRestoreJobTests
private RestoreJob createTestNewJob(String jobId)
{
- return RestoreJob.create(null, UUID.fromString(jobId), "ks", "table",
- "agent", RestoreJobStatus.SUCCEEDED, SECRETS,
SSTableImportOptions.defaults());
+ return RestoreJob.builder()
+ .jobId(UUID.fromString(jobId))
+ .keyspace("ks").table("table")
+ .jobAgent("agent")
+ .jobStatus(RestoreJobStatus.SUCCEEDED)
+ .jobSecrets(SECRETS)
+ .sstableImportOptions(SSTableImportOptions.defaults())
+ .build();
}
private JsonObject getRequestPayload()
diff --git
a/src/test/java/org/apache/cassandra/sidecar/utils/AsyncFileSystemUtilsTest.java
b/src/test/java/org/apache/cassandra/sidecar/utils/AsyncFileSystemUtilsTest.java
new file mode 100644
index 0000000..fc4d354
--- /dev/null
+++
b/src/test/java/org/apache/cassandra/sidecar/utils/AsyncFileSystemUtilsTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.util.UUID;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.AssertionUtils;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+import org.apache.cassandra.sidecar.exceptions.InsufficientStorageException;
+import org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.FileStoreProps;
+
+import static
org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSufficientStorage;
+import static
org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.fileStoreProps;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class AsyncFileSystemUtilsTest
+{
+ private ExecutorPools executorPools;
+
+ @BeforeEach
+ void setup()
+ {
+ executorPools = new ExecutorPools(Vertx.vertx(), new
ServiceConfigurationImpl());
+ }
+
+ @AfterEach
+ void teardown()
+ {
+ executorPools.close();
+ }
+
+ @Test
+ void testReadFileStoreProps()
+ {
+ FileStoreProps props = AssertionUtils.getBlocking(fileStoreProps(".",
executorPools.internal()));
+ assertThat(props.name).isNotBlank();
+
+ long total = props.totalSpace;
+ long usable = props.usableSpace;
+ long unallocated = props.unallocatedSpace;
+ assertThat(total)
+ .isGreaterThan(usable)
+ .isGreaterThan(unallocated)
+ .isGreaterThan(0L);
+
+ assertThat(unallocated)
+ .isGreaterThanOrEqualTo(usable)
+ .isGreaterThan(0L);
+
+ assertThat(usable).isGreaterThan(0L);
+ }
+
+ @Test
+ void testEnsureSufficientStorage() throws Exception
+ {
+ // this check should pass (hopefully), as the required usable
percentage is 0.0001
+ AssertionUtils.getBlocking(ensureSufficientStorage(".", 0L, 0.0001,
executorPools.internal()));
+
+ // requesting half of the usable space should pass
+ FileStoreProps props = AssertionUtils.getBlocking(fileStoreProps(".",
executorPools.internal()));
+ AssertionUtils.getBlocking(ensureSufficientStorage(".",
props.usableSpace / 2,
+ 0,
executorPools.internal()));
+
+ assertThatThrownBy(() ->
AssertionUtils.getBlocking(ensureSufficientStorage(".", Long.MAX_VALUE,
+
0.0001,
+
executorPools.internal())))
+ .describedAs("Request Long.MAX_VALUE on the local file store should
fail")
+ .hasRootCauseExactlyInstanceOf(InsufficientStorageException.class)
+ .hasMessageContaining("FileStore has insufficient space");
+
+ assertThatThrownBy(() ->
AssertionUtils.getBlocking(ensureSufficientStorage(".", 123L,
+
1.0, executorPools.internal())))
+ .describedAs("Require 100% usable disk of the local file store should
fail")
+ .hasRootCauseExactlyInstanceOf(InsufficientStorageException.class)
+ .hasMessageContaining("FileStore has insufficient space");
+ }
+
+ @Test
+ void testEnsureSufficientStorageWithNonexistingFilePath()
+ {
+ // The input path `./non-existing + uuid` does not exist.
+ // `ensureSufficientStorage` should navigate to parent paths until
finding an existing path
+ // to be used for checking
+ // The test expects no exception is thrown
+ AssertionUtils.getBlocking(ensureSufficientStorage("./non-existing" +
UUID.randomUUID(), 0L,
+ 0.0001,
executorPools.internal()));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]