vararo27 opened a new pull request, #1918:
URL: https://github.com/apache/cassandra-java-driver/pull/1918

   An exception is thrown from DefaultSchemaQueriesFactory.java (newInstance 
method) when Control connection goes down for any reason and new connection has 
not yet initialized yet. 
   
   `throw new IllegalStateException("Control channel not available, aborting 
schema refresh");`
   
   
   Exception thrown from here is not propagated to whenComplete method used in 
below code snippet of MetadataManager.java and refreshFuture is never marked as 
completed and application gets stuck. Reason for that is exception not handled 
properly in SchemaRows future object. Its been depicted in this PR using 
HandlerSchemaQueries.java
   
   `    
   private void startSchemaRequest(CompletableFuture<RefreshSchemaResult> 
refreshFuture) {
         assert adminExecutor.inEventLoop();
         if (closeWasCalled) {
           refreshFuture.complete(new RefreshSchemaResult(metadata));
           return;
         }
         if (currentSchemaRefresh == null) {
           currentSchemaRefresh = refreshFuture;
           LOG.debug("[{}] Starting schema refresh", logPrefix);
           initControlConnectionForSchema()
               .thenCompose(v -> 
context.getTopologyMonitor().checkSchemaAgreement())
               .whenComplete(
                   (schemaInAgreement, agreementError) -> {
                     if (agreementError != null) {
                       refreshFuture.completeExceptionally(agreementError);
                     } else {
                       schemaQueriesFactory
                           .newInstance()
                           .execute()
                           .thenApplyAsync(this::parseAndApplySchemaRows, 
adminExecutor)
                           .whenComplete(
                               (newMetadata, metadataError) -> {
                                 if (metadataError != null) {
                                   
refreshFuture.completeExceptionally(metadataError);
                                 } else {
                                   refreshFuture.complete(
                                       new RefreshSchemaResult(newMetadata, 
schemaInAgreement));
                                 }
   
                                 firstSchemaRefreshFuture.complete(null);
   
                                 currentSchemaRefresh = null;
                                 // If another refresh was enqueued during this 
one, run it now
                                 if (queuedSchemaRefresh != null) {
                                   CompletableFuture<RefreshSchemaResult> tmp =
                                       this.queuedSchemaRefresh;
                                   this.queuedSchemaRefresh = null;
                                   startSchemaRequest(tmp);
                                 }
                               });
                     }
                   });
         } else if (queuedSchemaRefresh == null) {
           queuedSchemaRefresh = refreshFuture; // wait for our turn
         } else {
           CompletableFutures.completeFrom(
               queuedSchemaRefresh, refreshFuture); // join the queued request
         }
       }
   `
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to