This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9912a62 CASSANDRASC-56: Create staging directory if it doesn't exists
9912a62 is described below
commit 9912a620a0e67d9aa723037aaf5237598a895eb7
Author: Francisco Guerrero <[email protected]>
AuthorDate: Mon Jun 19 16:41:10 2023 -0700
CASSANDRASC-56: Create staging directory if it doesn't exists
During SSTable upload, the upload will fail if the configured staging
directory does not
exist. When this occurs an operator must manually create the directory,
which increases
the configuration toil.
In this commit, we automatically create the staging directory if it doesn't
exists during
SSTable upload. This improves the overall operational experience when
running the Sidecar.
patch by Francisco Guerrero; reviewed by Dinesh Joshi, Yifan Cai for
CASSANDRASC-56
---
CHANGES.txt | 1 +
.../sstableuploads/SSTableCleanupHandler.java | 2 +-
.../sstableuploads/SSTableImportHandler.java | 52 ++++++++++++----------
.../sstableuploads/SSTableUploadHandler.java | 18 ++++----
.../cassandra/sidecar/utils/BaseFileSystem.java | 11 +++++
.../cassandra/sidecar/utils/SSTableImporter.java | 2 +-
.../sidecar/utils/SSTableUploadsPathBuilder.java | 23 +++++++---
.../sstableuploads/SSTableImportHandlerTest.java | 13 ++++--
.../sstableuploads/SSTableUploadHandlerTest.java | 17 -------
.../sidecar/utils/SSTableImporterTest.java | 2 +-
10 files changed, 77 insertions(+), 64 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 86f8bf0..7131a8b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Create staging directory if it doesn't exists (CASSANDRASC-56)
* Remove RESTEasy (CASSANDRASC-57)
* Use in-jvm dtest framework for integration tests (CASSANDRASC-51)
* Sidecar returns own version in node settings (CASSANDRASC-52)
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java
index 849fd3d..bffb9a5 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java
@@ -66,7 +66,7 @@ public class SSTableCleanupHandler extends
AbstractHandler<String>
SocketAddress remoteAddress,
String uploadId)
{
- uploadPathBuilder.resolveStagingDirectory(host, uploadId)
+ uploadPathBuilder.resolveUploadIdDirectory(host, uploadId)
.compose(uploadPathBuilder::isValidDirectory)
.compose(stagingDirectory -> context.vertx()
.fileSystem()
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
index c5d5695..75f8049 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
@@ -109,7 +109,7 @@ public class SSTableImportHandler extends
AbstractHandler<SSTableImportRequest>
}
else if (importResult.failed())
{
- context.fail(importResult.cause());
+ processFailure(importResult.cause(), context,
host, remoteAddress, request);
}
else
{
@@ -121,29 +121,33 @@ public class SSTableImportHandler extends
AbstractHandler<SSTableImportRequest>
request, remoteAddress, host);
}
})
- .onFailure(cause -> {
- if (cause instanceof NoSuchFileException)
- {
- logger.error("Upload directory not found for
request={}, remoteAddress={}, " +
- "instance={}", request,
remoteAddress, host, cause);
-
context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND,
cause.getMessage()));
- }
- else if (cause instanceof
IllegalArgumentException)
- {
-
context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST,
cause.getMessage(),
- cause));
- }
- else if (cause instanceof HttpException)
- {
- context.fail(cause);
- }
- else
- {
- logger.error("Unexpected error during import
SSTables for request={}, " +
- "remoteAddress={}, instance={}",
request, remoteAddress, host, cause);
-
context.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
- }
- });
+ .onFailure(cause -> processFailure(cause, context,
host, remoteAddress, request));
+ }
+
+ @Override
+ protected void processFailure(Throwable cause,
+ RoutingContext context,
+ String host,
+ SocketAddress remoteAddress,
+ SSTableImportRequest request)
+ {
+ if (cause instanceof NoSuchFileException)
+ {
+ logger.error("Upload directory not found for request={},
remoteAddress={}, " +
+ "instance={}", request, remoteAddress, host, cause);
+ context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND,
cause.getMessage()));
+ }
+ else if (cause instanceof IllegalArgumentException)
+ {
+ context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST,
cause.getMessage(),
+ cause));
+ }
+ else if (cause instanceof HttpException)
+ {
+ context.fail(cause);
+ }
+
+ super.processFailure(cause, context, host, remoteAddress, request);
}
@Override
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
index 7952d5d..99babe1 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
@@ -32,7 +32,6 @@ import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.Configuration;
-import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.data.SSTableUploadResponse;
import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator;
@@ -103,8 +102,6 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequest>
// accept the upload.
httpRequest.pause();
- InstanceMetadata instanceMetadata = metadataFetcher.instance(host);
-
long startTimeInNanos = System.nanoTime();
if (!limiter.tryAcquire())
{
@@ -116,7 +113,8 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequest>
context.addEndHandler(v -> limiter.releasePermit());
validateKeyspaceAndTable(host, request)
- .compose(validRequest ->
ensureSufficientSpaceAvailable(instanceMetadata))
+ .compose(validRequest ->
uploadPathBuilder.resolveStagingDirectory(host))
+ .compose(this::ensureSufficientSpaceAvailable)
.compose(v -> uploadPathBuilder.build(host, request))
.compose(uploadDirectory -> uploader.uploadComponent(httpRequest,
uploadDirectory, request.component(),
request.expectedChecksum()))
@@ -187,17 +185,17 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequest>
* Ensures there is sufficient space available as per configured in the
* {@link Configuration#getMinSpacePercentRequiredForUpload()}.
*
- * @param instanceMetadata instance meta data
+ * @param uploadDirectory the directory where the SSTables are uploaded
* @return a succeeded future if there is sufficient space available, or
failed future otherwise
*/
- private Future<Void> ensureSufficientSpaceAvailable(InstanceMetadata
instanceMetadata)
+ private Future<String> ensureSufficientSpaceAvailable(String
uploadDirectory)
{
float minimumPercentageRequired =
configuration.getMinSpacePercentRequiredForUpload();
if (minimumPercentageRequired == 0)
{
- return Future.succeededFuture();
+ return Future.succeededFuture(uploadDirectory);
}
- return fs.fsProps(instanceMetadata.stagingDir())
+ return fs.fsProps(uploadDirectory)
.compose(fsProps -> {
// calculate available disk space percentage
long totalSpace = fsProps.totalSpace();
@@ -213,12 +211,12 @@ public class SSTableUploadHandler extends
AbstractHandler<SSTableUploadRequest>
if (availableDiskSpacePercentage <
minimumPercentageRequired)
{
logger.warn("Insufficient space available for upload
in stagingDir={}, available={}%, " +
- "required={}%",
instanceMetadata.stagingDir(),
+ "required={}%", uploadDirectory,
availableDiskSpacePercentage,
minimumPercentageRequired);
return
Future.failedFuture(wrapHttpException(HttpResponseStatus.INSUFFICIENT_STORAGE,
"Insufficient space available for upload"));
}
- return Future.succeededFuture();
+ return Future.succeededFuture(uploadDirectory);
});
}
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java
b/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java
index f7f7c56..fc0b532 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java
@@ -96,6 +96,17 @@ public class BaseFileSystem
return isValidOfType(path, FileProps::isDirectory);
}
+ /**
+ * Creates the directory if it doesn't exist, and then validates that
{@code path} is a valid directory.
+ *
+ * @param path the path to the directory
+ * @return a future of the validated {@code path}, a failed future
otherwise
+ */
+ public Future<String> ensureDirectoryExists(String path)
+ {
+ return fs.mkdirs(path).compose(v -> Future.succeededFuture(path));
+ }
+
/**
* @param filename the path
* @param predicate a predicate that evaluates based on {@link FileProps}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
index af5f5dc..b6f502c 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
@@ -252,7 +252,7 @@ public class SSTableImporter
*/
private void cleanup(ImportOptions options)
{
- uploadPathBuilder.resolveStagingDirectory(options.host,
options.uploadId)
+ uploadPathBuilder.resolveUploadIdDirectory(options.host,
options.uploadId)
.compose(uploadPathBuilder::isValidDirectory)
.compose(stagingDirectory -> vertx.fileSystem()
.deleteRecursive(stagingDirectory, true))
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java
b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java
index cbf0bc3..9a26a54 100644
---
a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java
+++
b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java
@@ -74,11 +74,24 @@ public class SSTableUploadsPathBuilder extends
BaseFileSystem
public <T extends SSTableUploads> Future<String> build(String host, T
request)
{
return validate(request)
- .compose(validRequest -> resolveStagingDirectory(host,
request.uploadId()))
+ .compose(validRequest -> resolveUploadIdDirectory(host,
request.uploadId()))
.compose(stagingDirectory ->
resolveUploadDirectory(stagingDirectory,
request.keyspace(), request.tableName()));
}
+ /**
+ * Builds the path to the configured staging directory for the given
{@code host}. Attempt to create the
+ * staging directory if it doesn't exist.
+ *
+ * @param host the name of the host
+ * @return a future to the created and validated staging directory
+ */
+ public Future<String> resolveStagingDirectory(String host)
+ {
+ InstanceMetadata instanceMeta = instancesConfig.instanceFromHost(host);
+ return
ensureDirectoryExists(StringUtils.removeEnd(instanceMeta.stagingDir(),
File.separator));
+ }
+
/**
* Builds the path to the {@code uploadId} staging directory inside the
specified {@code host}.
*
@@ -86,13 +99,11 @@ public class SSTableUploadsPathBuilder extends
BaseFileSystem
* @param uploadId an identifier for the upload ID
* @return the absolute path of the {@code uploadId} staging directory
*/
- public Future<String> resolveStagingDirectory(String host, String uploadId)
+ public Future<String> resolveUploadIdDirectory(String host, String
uploadId)
{
return validateUploadId(uploadId)
- .compose(validUploadId -> {
- InstanceMetadata instanceMeta =
instancesConfig.instanceFromHost(host);
- return
isValidDirectory(StringUtils.removeEnd(instanceMeta.stagingDir(),
File.separator));
- })
+ .compose(validUploadId -> resolveStagingDirectory(host))
+ .compose(this::isValidDirectory)
.compose(directory -> Future.succeededFuture(directory +
File.separatorChar + uploadId));
}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java
index 5b0a51c..e73fa53 100644
---
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java
@@ -103,12 +103,17 @@ public class SSTableImportHandlerTest extends
BaseUploadsHandlerTest
}
@Test
- void testNonExistentUploadDirectory(VertxTestContext context)
+ void testNonExistentUploadDirectory(VertxTestContext context) throws
InterruptedException
{
UUID uploadId = UUID.randomUUID();
- client.put(config.getPort(), "localhost", "/api/v1/uploads/" +
uploadId + "/keyspaces/ks/tables/table/import")
- .expect(ResponsePredicate.SC_NOT_FOUND)
- .send(context.succeedingThenComplete());
+
+ TableOperations mockCFOperations = mock(TableOperations.class);
+ when(mockDelegate.tableOperations()).thenReturn(mockCFOperations);
+
+ String requestURI = "/api/v1/uploads/" + uploadId +
"/keyspaces/ks/tables/table/import";
+ clientRequest(context, requestURI,
+ response -> assertThat(response.statusCode())
+
.isEqualTo(HttpResponseStatus.NOT_FOUND.code()));
}
@Test
diff --git
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
index 6888f3d..e29f055 100644
---
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
@@ -57,7 +57,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
void testUploadWithoutMd5_expectSuccessfulUpload(VertxTestContext context)
throws IOException
{
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", "",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.OK.code(), false);
}
@@ -66,7 +65,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
void testUploadWithCorrectMd5_expectSuccessfulUpload(VertxTestContext
context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-correct-md5.db", "jXd/OF09/siBXSD3SWAm3A==",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.OK.code(), false);
}
@@ -75,7 +73,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
void testUploadWithIncorrectMd5_expectErrorCode(VertxTestContext context)
throws IOException
{
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-incorrect-md5.db", "incorrectMd5",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(),
false);
@@ -85,7 +82,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
void testInvalidFileName_expectErrorCode(VertxTestContext context) throws
IOException
{
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"ks$tbl-me-4-big-Data.db", "",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(),
false);
@@ -95,7 +91,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
void
testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context)
throws IOException
{
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-content-length.db",
"jXd/OF09/siBXSD3SWAm3A==", 0,
HttpResponseStatus.OK.code(), false);
}
@@ -106,7 +101,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
// if we send more than actual length, vertx goes hung, probably
looking for more data than exists in the file,
// we should see timeout error in this case
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-higher-content-length.db", "", 1000, -1, true);
}
@@ -114,7 +108,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
void
testUploadWithLesserContentLength_expectSuccessfulUpload(VertxTestContext
context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-lesser-content-length.db",
"",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(),
false);
@@ -124,7 +117,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
public void testInvalidKeyspace(VertxTestContext context) throws
IOException
{
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace",
"tbl", "with-lesser-content-length.db", "",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(),
false);
@@ -134,7 +126,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
public void testInvalidTable(VertxTestContext context) throws IOException
{
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "ks",
"invalidTableName", "with-lesser-content-length.db", "",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(),
false);
@@ -146,7 +137,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
when(mockConfiguration.getMinSpacePercentRequiredForUpload()).thenReturn(100F);
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", "",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.INSUFFICIENT_STORAGE.code(), false);
@@ -158,7 +148,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
when(mockConfiguration.getConcurrentUploadsLimit()).thenReturn(0);
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"without-md5.db", "",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.TOO_MANY_REQUESTS.code(), false);
@@ -170,7 +159,6 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
when(mockConfiguration.getConcurrentUploadsLimit()).thenReturn(1);
UUID uploadId = UUID.randomUUID();
- ensureStagingDirectoryExists();
CountDownLatch latch = new CountDownLatch(1);
sendUploadRequestAndVerify(latch, context, uploadId,
"invalidKeyspace", "tbl", "without-md5.db", "",
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
HttpResponseStatus.BAD_REQUEST.code(),
@@ -251,9 +239,4 @@ public class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
client.close();
});
}
-
- private void ensureStagingDirectoryExists() throws IOException
- {
-
Files.createDirectories(Paths.get(SnapshotUtils.makeStagingDir(temporaryFolder.getAbsolutePath())));
- }
}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
index eb716a6..a20ca22 100644
--- a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
@@ -92,7 +92,7 @@ class SSTableImporterTest
// get NullPointerExceptions because the mock is not wired up, and we
need to prevent vertx from actually
// doing a vertx.filesystem().deleteRecursive(). So we return a failed
future with a fake path when checking
// if the directory exists.
- when(mockUploadPathBuilder.resolveStagingDirectory(anyString(),
anyString()))
+ when(mockUploadPathBuilder.resolveUploadIdDirectory(anyString(),
anyString()))
.thenReturn(Future.failedFuture("fake-path"));
when(mockUploadPathBuilder.isValidDirectory("fake-path")).thenReturn(Future.failedFuture("skip
cleanup"));
importer = new SSTableImporter(vertx, mockMetadataFetcher,
mockConfiguration, executorPools,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]