This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new f848cd06 CASSANDRASC-107: Improve logging for slice restore task (#108)
f848cd06 is described below
commit f848cd063e5e1671c84807615f5eae809253971d
Author: Francisco Guerrero <[email protected]>
AuthorDate: Thu Mar 21 15:26:06 2024 -0700
CASSANDRASC-107: Improve logging for slice restore task (#108)
Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-107
---
CHANGES.txt | 3 +-
.../cassandra/sidecar/restore/RestoreJobUtil.java | 8 +-
.../sidecar/restore/RestoreSliceTask.java | 229 ++++++++++-----------
.../cassandra/sidecar/restore/StorageClient.java | 40 +++-
4 files changed, 143 insertions(+), 137 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index ce39b66c..638ee316 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Improve logging for slice restore task (CASSANDRASC-107)
* Add restore task watcher to report long running tasks (CASSANDRASC-106)
* RestoreSliceTask could be stuck due to missing exception handling
(CASSANDRASC-105)
* Make hash algorithm implementation pluggable (CASSANDRASC-114)
@@ -82,4 +83,4 @@
* Add integration tests task (CASSANDRA-15031)
* Add support for SSL and bindable address (CASSANDRA-15030)
* Autogenerate API docs for sidecar (CASSANDRA-15028)
- * C* Management process (CASSANDRA-14395)
\ No newline at end of file
+ * C* Management process (CASSANDRA-14395)
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index 8651d535..be488ae4 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -78,9 +78,9 @@ public class RestoreJobUtil
{
try (ZipInputStream zis = new
ZipInputStream(Files.newInputStream(zipFile.toPath())))
{
- ZipEntry zipEntry = zis.getNextEntry();
+ ZipEntry zipEntry;
- while (zipEntry != null)
+ while ((zipEntry = zis.getNextEntry()) != null)
{
// Encounters a directory inside the zip file
// It is not expected. The zip file should have the directory
depth of 1.
@@ -92,8 +92,6 @@ public class RestoreJobUtil
File targetFile = newProtectedTargetFile(zipEntry, targetDir);
Files.copy(zis, targetFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
-
- zipEntry = zis.getNextEntry();
}
zis.closeEntry();
}
@@ -161,7 +159,7 @@ public class RestoreJobUtil
}
catch (IOException e)
{
- LOGGER.error("Unexpected error occurred while cleaning
directory {}, ", path, e);
+ LOGGER.error("Unexpected error occurred while cleaning
directory {}", path, e);
throw new RuntimeException(e);
}
});
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index ed85d3f9..8c828d89 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.sidecar.restore;
import java.io.File;
import java.nio.file.Files;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -48,6 +47,7 @@ import
software.amazon.awssdk.core.exception.ApiCallTimeoutException;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.S3Exception;
+import static io.vertx.core.Future.fromCompletionStage;
import static
org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSufficientStorage;
/**
@@ -55,7 +55,7 @@ import static
org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSuff
* and imports SSTables into Cassandra.
* It the execution ever fails, the cause should only be
* {@link org.apache.cassandra.sidecar.exceptions.RestoreJobException}
- *
+ * <p>
* Note that the class is package private, and it is not intended to be
referenced by other packages.
*/
public class RestoreSliceTask implements RestoreSliceHandler
@@ -130,25 +130,17 @@ public class RestoreSliceTask implements
RestoreSliceHandler
}
// 1. check object existence and validate eTag / checksum
- CompletableFuture<Void> fut = checkObjectExistence(event)
- // 2. download slice/object when the remote object exists
- .thenCompose(headObject -> downloadSlice(event))
- // 3. persist status
- .thenAccept(file -> {
- slice.completeStagePhase();
- sliceDatabaseAccessor.updateStatus(slice);
- // completed staging. A new task is produced when it
comes to import
- event.tryComplete(slice);
- })
- .whenComplete((x, cause) -> {
- if (cause != null)
- {
- // handle unexpected errors thrown during download
slice call, that do not close event
-
event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause));
- }
- });
-
- return Future.fromCompletionStage(fut);
+ return checkObjectExistence(event)
+ .compose(headObject -> downloadSlice(event))
+ .<Void>compose(file -> {
+ slice.completeStagePhase();
+ sliceDatabaseAccessor.updateStatus(slice);
+ return Future.succeededFuture();
+ })
+ // completed staging. A new task is produced when
it comes to import
+ .onSuccess(_v -> event.tryComplete(slice))
+ // handle unexpected errors thrown during download
slice call, that do not close event
+ .onFailure(cause ->
event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause)));
}
else if (job.status == RestoreJobStatus.STAGED)
{
@@ -177,75 +169,73 @@ public class RestoreSliceTask implements
RestoreSliceHandler
private Future<Void> downloadSliceAndImport(Promise<RestoreSlice> event)
{
// 1. check object existence and validate eTag / checksum
- CompletableFuture<File> fut = checkObjectExistence(event)
- // 2. download slice/object when the remote object exists
- .thenCompose(headObject -> downloadSlice(event));
- // 3. unzip the file and import/commit
- return Future.fromCompletionStage(fut)
- .compose(file -> unzipAndImport(event, file));
+ return checkObjectExistence(event)
+ // 2. download slice/object when the remote object exists
+ .compose(headObject -> downloadSlice(event))
+ // 3. unzip the file and import/commit
+ .compose(file -> unzipAndImport(event, file));
}
- private CompletableFuture<?> checkObjectExistence(Promise<RestoreSlice>
event)
+ private Future<?> checkObjectExistence(Promise<RestoreSlice> event)
{
// skip query s3 if the object existence is already confirmed
if (slice.existsOnS3())
{
- return CompletableFuture.completedFuture(null);
+ LOGGER.debug("The slice already exists on S3. jobId={}
sliceKey={}", slice.jobId(), slice.key());
+ return Future.succeededFuture();
}
- return s3Client
- .objectExists(slice) // even if the file already exists on disk, we
should still check the object existence
- .whenComplete((resp, cause) -> {
- if (cause == null)
- {
- stats.captureSliceReplicationTime(currentTimeInNanos() -
slice.creationTimeNanos());
- slice.setExistsOnS3();
- return;
- }
-
+ // even if the file already exists on disk, we should still check the
object existence
+ return
+ fromCompletionStage(s3Client.objectExists(slice))
+ .onSuccess(exists -> {
+ long durationNanos = currentTimeInNanos() -
slice.creationTimeNanos();
+ stats.captureSliceReplicationTime(durationNanos);
+ slice.setExistsOnS3();
+ LOGGER.debug("Slice is now available on S3. jobId={} sliceKey={}
replicationTimeNanos={}",
+ slice.jobId(), slice.key(), durationNanos);
+ })
+ .onFailure(cause -> {
S3Exception s3Exception = ThrowableUtils.getCause(cause,
S3Exception.class);
if (s3Exception == null) // has non-null cause, but not S3Exception
{
event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected
error when checking object existence",
slice, cause));
}
+ else if (s3Exception instanceof NoSuchKeyException)
+ {
+ event.tryFail(RestoreJobExceptions.ofSlice("Object not found",
slice, null));
+ }
+ else if (s3Exception.statusCode() == 412)
+ {
+ // When checksum/eTag does not match, it should be an
unrecoverable error and fail immediately.
+ // For such scenario, we expect "S3Exception: (Status Code:
412)". Also see,
+ //
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax
+ event.tryFail(RestoreJobExceptions.ofFatalSlice("Object
checksum mismatched",
+ slice,
s3Exception));
+ stats.captureSliceChecksumMismatch(slice.owner().id());
+ }
+ else if (s3Exception.statusCode() == 403)
+ {
+ // Fail immediately if 403 forbidden is returned.
+ // There might be permission issue on accessing the object.
+ event.tryFail(RestoreJobExceptions.ofFatalSlice("Object access
is forbidden",
+ slice,
s3Exception));
+ stats.captureTokenUnauthorized();
+ }
+ else if (s3Exception.statusCode() == 400 &&
+ s3Exception.getMessage().contains("token has expired"))
+ {
+ // Fail the job if 400, token has expired.
+ //
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
+ event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has
expired", slice, s3Exception));
+ stats.captureTokenExpired();
+ }
else
{
- if (s3Exception instanceof NoSuchKeyException)
- {
- event.tryFail(RestoreJobExceptions.ofSlice("Object not
found", slice, null));
- }
- else if (s3Exception.statusCode() == 412)
- {
- // When checksum/eTag does not match, it should be an
unrecoverable error and fail immediately.
- // For such scenario, we expect "S3Exception: (Status
Code: 412)". Also see,
- //
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax
- event.tryFail(RestoreJobExceptions.ofFatalSlice("Object
checksum mismatched",
- slice,
s3Exception));
- stats.captureSliceChecksumMismatch(slice.owner().id());
- }
- else if (s3Exception.statusCode() == 403)
- {
- // Fail immediately if 403 forbidden is returned.
- // There might be permission issue on accessing the object.
- event.tryFail(RestoreJobExceptions.ofFatalSlice("Object
access is forbidden",
- slice,
s3Exception));
- stats.captureTokenUnauthorized();
- }
- else if (s3Exception.statusCode() == 400 &&
- s3Exception.getMessage().contains("token has
expired"))
- {
- // Fail the job if 400, token has expired.
- //
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
- event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has
expired", slice, s3Exception));
- stats.captureTokenExpired();
- }
- else
- {
- // Retry the other S3Exceptions
- event.tryFail(RestoreJobExceptions.ofSlice("Unable to
check object existence",
- slice,
s3Exception));
- }
+ // Retry the other S3Exceptions
+ event.tryFail(RestoreJobExceptions.ofSlice("Unable to check
object existence",
+ slice,
s3Exception));
}
});
}
@@ -255,16 +245,14 @@ public class RestoreSliceTask implements
RestoreSliceHandler
return restoreJobUtil.currentTimeNanos();
}
- private CompletableFuture<File> downloadSlice(Promise<RestoreSlice> event)
+ private Future<File> downloadSlice(Promise<RestoreSlice> event)
{
if (slice.isCancelled())
{
RestoreJobFatalException ex =
RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
slice, null);
event.tryFail(ex);
- CompletableFuture<File> failedFuture = new CompletableFuture<>();
- failedFuture.completeExceptionally(ex);
- return failedFuture;
+ return Future.failedFuture(ex);
}
if (slice.downloadAttempt() > 0)
@@ -274,20 +262,17 @@ public class RestoreSliceTask implements
RestoreSliceHandler
}
LOGGER.info("Begin downloading restore slice. sliceKey={}",
slice.key());
- CompletableFuture<File> future = s3Client
- .downloadObjectIfAbsent(slice)
- .whenComplete((file, cause) -> {
- if (cause != null)
+ Future<File> future =
+ fromCompletionStage(s3Client.downloadObjectIfAbsent(slice))
+ .onFailure(cause -> {
+ slice.incrementDownloadAttempt();
+ if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class)
!= null)
{
- slice.incrementDownloadAttempt();
- if (ThrowableUtils.getCause(cause,
ApiCallTimeoutException.class) != null)
- {
- LOGGER.warn("Downloading restore slice times out.
sliceKey={}", slice.key());
- stats.captureSliceDownloadTimeout(slice.owner().id());
- }
- event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable
error when downloading object",
- slice, cause));
+ LOGGER.warn("Downloading restore slice times out.
sliceKey={}", slice.key());
+ stats.captureSliceDownloadTimeout(slice.owner().id());
}
+ event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable
error when downloading object",
+ slice, cause));
});
return Timer.measureTimeTaken(future, duration -> {
@@ -315,28 +300,28 @@ public class RestoreSliceTask implements
RestoreSliceHandler
// run the rest in the executor pool, instead of S3 client threadpool
return unzip(file)
- .compose(this::validateFiles)
- .compose(this::commit)
- .compose(x -> {
- if (onSuccessCommit == null)
- {
- return Future.succeededFuture();
- }
-
- return executorPool.<Void>executeBlocking(promise -> {
- onSuccessCommit.run();
- promise.tryComplete();
- });
- })
- .onSuccess(x -> {
- slice.completeImportPhase();
- event.tryComplete(slice);
- })
- .onFailure(failure -> {
- logWarnIfHasHttpExceptionCauseOnCommit(failure, slice);
- event.tryFail(RestoreJobExceptions.propagate("Fail to commit
slice. "
- +
slice.shortDescription(), failure));
- });
+ .compose(this::validateFiles)
+ .compose(this::commit)
+ .compose(x -> {
+ if (onSuccessCommit == null)
+ {
+ return Future.succeededFuture();
+ }
+
+ return executorPool.<Void>executeBlocking(promise -> {
+ onSuccessCommit.run();
+ promise.tryComplete();
+ });
+ })
+ .onSuccess(x -> {
+ slice.completeImportPhase();
+ event.tryComplete(slice);
+ })
+ .onFailure(failure -> {
+ logWarnIfHasHttpExceptionCauseOnCommit(failure, slice);
+ event.tryFail(RestoreJobExceptions.propagate("Fail to
commit slice. "
+ +
slice.shortDescription(), failure));
+ });
}
private Future<File> unzip(File zipFile)
@@ -358,7 +343,8 @@ public class RestoreSliceTask implements RestoreSliceHandler
{
if (targetDirExist)
{
- LOGGER.debug("The files in slice are already extracted.
Maybe it is a retried task?");
+ LOGGER.debug("The files in slice are already extracted.
Maybe it is a retried task? " +
+ "jobId={} sliceKey={}", slice.jobId(),
slice.key());
promise.complete(targetDir);
}
else
@@ -381,8 +367,8 @@ public class RestoreSliceTask implements RestoreSliceHandler
// Then, delete the downloaded zip file
if (!zipFile.delete())
{
- LOGGER.warn("File deletion attempt failed. file={}",
- zipFile.getAbsolutePath());
+ LOGGER.warn("File deletion attempt failed. jobId={}
sliceKey={} file={}",
+ slice.jobId(), slice.key(),
zipFile.getAbsolutePath());
}
}
catch (Exception cause)
@@ -487,7 +473,7 @@ public class RestoreSliceTask implements RestoreSliceHandler
return
Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is
cancelled",
slice, null));
- LOGGER.info("Begin committing SSTables. sliceKey={}", slice.key());
+ LOGGER.info("Begin committing SSTables. jobId={} sliceKey={}",
slice.jobId(), slice.key());
SSTableImportOptions options = slice.job().importOptions;
SSTableImporter.ImportOptions importOptions = new
SSTableImporter.ImportOptions.Builder()
@@ -505,8 +491,8 @@ public class RestoreSliceTask implements RestoreSliceHandler
.uploadId(slice.uploadId())
.build();
Future<Void> future = importer.scheduleImport(importOptions)
- .onSuccess(ignored ->
LOGGER.info("Finish committing SSTables. sliceKey={}",
-
slice.key()));
+ .onSuccess(ignored ->
LOGGER.info("Finish committing SSTables. jobId={} sliceKey={}",
+
slice.jobId(), slice.key()));
return Timer.measureTimeTaken(future, d ->
stats.captureSliceImportTime(slice.owner().id(), d));
}
@@ -529,15 +515,16 @@ public class RestoreSliceTask implements
RestoreSliceHandler
return;
}
- LOGGER.warn("Committing slice failed with HttpException. slice={}
statusCode={} exceptionPayload={}",
- slice.sliceId(), httpException.getStatusCode(),
httpException.getPayload(), httpException);
+ LOGGER.warn("Committing slice failed with HttpException. jobId={}
sliceKey={} statusCode={} " +
+ "exceptionPayload={}", slice.jobId(), slice.key(),
httpException.getStatusCode(),
+ httpException.getPayload(), httpException);
}
@Override
public long elapsedInNanos()
{
return taskStartTimeNanos == -1 ? -1 :
- currentTimeInNanos() - taskStartTimeNanos;
+ currentTimeInNanos() - taskStartTimeNanos;
}
@Override
diff --git
a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index 9bf4f0e5..2795b7b5 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -105,6 +105,8 @@ public class StorageClient
/**
* Revoke the credentials of a {@link RestoreJob}
* It should be called when the job is in a final {@link
org.apache.cassandra.sidecar.common.data.RestoreJobStatus}
+ *
+ * @param jobId the unique identifier for the job
*/
public void revokeCredentials(UUID jobId)
{
@@ -132,7 +134,7 @@ public class StorageClient
.build();
return client.headObject(request)
- .whenComplete(logCredentialOnRequestFailure(credentials));
+ .whenComplete(logCredentialOnRequestFailure(slice,
credentials));
}
public CompletableFuture<File> downloadObjectIfAbsent(RestoreSlice slice)
@@ -140,6 +142,7 @@ public class StorageClient
Credentials credentials = credentialsProviders.get(slice.jobId());
if (credentials == null)
{
+ LOGGER.debug("Credentials to download object not found. jobId={}",
slice.jobId());
CompletableFuture<File> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(credentialsNotFound(slice));
return failedFuture;
@@ -156,6 +159,8 @@ public class StorageClient
File object = objectPath.toFile();
if (object.exists())
{
+ LOGGER.debug("Skipping download, file already exists. jobId={}
s3_object={}",
+ slice.jobId(), slice.stagedObjectPath());
// Skip downloading if the file already exists on disk. It should
be a rare scenario.
// Note that the on-disk file could be different from the remote
object, although the name matches.
// TODO 1: verify etag does not change after s3 replication and
batch copy
@@ -166,10 +171,12 @@ public class StorageClient
}
if (!object.getParentFile().mkdirs())
{
- LOGGER.warn("Error occurred while creating directory for S3 object
{}", objectPath);
+ LOGGER.warn("Error occurred while creating directory. jobId={}
s3_object={}",
+ slice.jobId(), slice.stagedObjectPath());
}
- return rateLimitedGetObject(client, request, objectPath)
- .whenComplete(logCredentialOnRequestFailure(credentials))
+ LOGGER.info("Downloading object. jobId={} s3_object={}",
slice.jobId(), slice.stagedObjectPath());
+ return rateLimitedGetObject(slice, client, request, objectPath)
+ .whenComplete(logCredentialOnRequestFailure(slice, credentials))
.thenApply(res -> object);
}
@@ -206,13 +213,14 @@ public class StorageClient
"jobId: " + slice.jobId());
}
- private BiConsumer<Object, ? super Throwable>
logCredentialOnRequestFailure(Credentials credentials)
+ private BiConsumer<Object, ? super Throwable>
logCredentialOnRequestFailure(RestoreSlice slice,
+
Credentials credentials)
{
return (ignored, cause) -> {
if (cause != null)
{
- LOGGER.error("GetObjectRequest is not successful.
credentials={}",
- credentials.readCredentials, cause);
+ LOGGER.error("GetObjectRequest is not successful. jobId={}
credentials={}",
+ slice.jobId(), credentials.readCredentials,
cause);
}
};
}
@@ -221,17 +229,21 @@ public class StorageClient
* Returns a {@link CompletableFuture} to the {@link GetObjectResponse}.
It writes the object from S3 to a file
* applying rate limiting on the download throughput.
*
+ * @param slice the slice to be restored
* @param client the S3 client
* @param request the {@link GetObjectRequest request}
* @param destinationPath the path where the object will be persisted
* @return a {@link CompletableFuture} of the {@link GetObjectResponse}
*/
- private CompletableFuture<GetObjectResponse>
rateLimitedGetObject(S3AsyncClient client,
+ private CompletableFuture<GetObjectResponse>
rateLimitedGetObject(RestoreSlice slice,
+
S3AsyncClient client,
GetObjectRequest request,
Path
destinationPath)
{
return client.getObject(request,
AsyncResponseTransformer.toPublisher())
- .thenCompose(responsePublisher ->
subscribeRateLimitedWrite(destinationPath, responsePublisher));
+ .thenCompose(responsePublisher ->
subscribeRateLimitedWrite(slice,
+
destinationPath,
+
responsePublisher));
}
/**
@@ -239,11 +251,13 @@ public class StorageClient
* by subscribing to the {@code publisher}. Applying backpressure on the
received bytes by rate limiting
* the download throughput using the {@code downloadRateLimiter} object.
*
+ * @param slice the slice to be restored
* @param destinationPath the path where the object will be persisted
* @param publisher the {@link ResponsePublisher}
* @return a {@link CompletableFuture} to the {@link GetObjectResponse}
*/
- CompletableFuture<GetObjectResponse> subscribeRateLimitedWrite(Path
destinationPath,
+ CompletableFuture<GetObjectResponse>
subscribeRateLimitedWrite(RestoreSlice slice,
+ Path
destinationPath,
ResponsePublisher<GetObjectResponse> publisher)
{
WritableByteChannel channel;
@@ -257,10 +271,14 @@ public class StorageClient
}
catch (FileAlreadyExistsException fileAlreadyExistsException)
{
+ LOGGER.debug("Skipping download, file already exists. jobId={}
s3_object={}",
+ slice.jobId(), slice.stagedObjectPath());
return CompletableFuture.completedFuture(publisher.response());
}
catch (IOException e)
{
+ LOGGER.error("Error occurred while creating channel.
destinationPath={} jobId={} s3_object={}",
+ destinationPath, slice.jobId(),
slice.stagedObjectPath(), e);
throw new RuntimeException(e);
}
// CompletableFuture that will be notified when all events have been
consumed or if an error occurs.
@@ -272,6 +290,8 @@ public class StorageClient
}
catch (IOException e)
{
+ LOGGER.error("Error occurred while downloading. jobId={}
s3_object={}",
+ slice.jobId(), slice.stagedObjectPath(), e);
throw new RuntimeException(e);
}
}).whenComplete((v, subscribeThrowable) -> closeChannel(channel));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]