vararo27 opened a new pull request, #1918:
URL: https://github.com/apache/cassandra-java-driver/pull/1918
An exception is thrown from DefaultSchemaQueriesFactory.java (newInstance
method) when Control connection goes down for any reason and new connection has
not yet initialized yet.
`throw new IllegalStateException("Control channel not available, aborting
schema refresh");`
Exception thrown from here is not propagated to whenComplete method used in
below code snippet of MetadataManager.java and refreshFuture is never marked as
completed and application gets stuck. Reason for that is exception not handled
properly in SchemaRows future object. Its been depicted in this PR using
HandlerSchemaQueries.java
`
private void startSchemaRequest(CompletableFuture<RefreshSchemaResult>
refreshFuture) {
assert adminExecutor.inEventLoop();
if (closeWasCalled) {
refreshFuture.complete(new RefreshSchemaResult(metadata));
return;
}
if (currentSchemaRefresh == null) {
currentSchemaRefresh = refreshFuture;
LOG.debug("[{}] Starting schema refresh", logPrefix);
initControlConnectionForSchema()
.thenCompose(v ->
context.getTopologyMonitor().checkSchemaAgreement())
.whenComplete(
(schemaInAgreement, agreementError) -> {
if (agreementError != null) {
refreshFuture.completeExceptionally(agreementError);
} else {
schemaQueriesFactory
.newInstance()
.execute()
.thenApplyAsync(this::parseAndApplySchemaRows,
adminExecutor)
.whenComplete(
(newMetadata, metadataError) -> {
if (metadataError != null) {
refreshFuture.completeExceptionally(metadataError);
} else {
refreshFuture.complete(
new RefreshSchemaResult(newMetadata,
schemaInAgreement));
}
firstSchemaRefreshFuture.complete(null);
currentSchemaRefresh = null;
// If another refresh was enqueued during this
one, run it now
if (queuedSchemaRefresh != null) {
CompletableFuture<RefreshSchemaResult> tmp =
this.queuedSchemaRefresh;
this.queuedSchemaRefresh = null;
startSchemaRequest(tmp);
}
});
}
});
} else if (queuedSchemaRefresh == null) {
queuedSchemaRefresh = refreshFuture; // wait for our turn
} else {
CompletableFutures.completeFrom(
queuedSchemaRefresh, refreshFuture); // join the queued request
}
}
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]