This is an automated email from the ASF dual-hosted git repository.

absurdfarce pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 6c4832919 CASSANDRA-19468 Don't swallow exception during metadata 
refresh
6c4832919 is described below

commit 6c48329199862215abc22170769fd1a165e80a15
Author: Ammar Khaku <ammar.kh...@gmail.com>
AuthorDate: Thu Mar 14 16:55:59 2024 -0700

    CASSANDRA-19468 Don't swallow exception during metadata refresh
    
    If an exception was thrown while getting new metadata as
    part of schema refresh it died on the admin executor instead
    of being propagated to the CompletableFuture argument.
    Instead, catch those exceptions and hand them off to the
    CompletableFuture.
    
    patch by Ammar Khaku; reviewed by Chris Lohfink, Bret McGuire for 
CASSANDRA-19468
---
 .../internal/core/metadata/MetadataManager.java    | 53 ++++++++++++----------
 .../core/metadata/MetadataManagerTest.java         | 23 ++++++++++
 2 files changed, 52 insertions(+), 24 deletions(-)

diff --git 
a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java
 
b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java
index 28e8b18f1..c9abfb7a6 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java
@@ -437,30 +437,35 @@ public class MetadataManager implements 
AsyncAutoCloseable {
                   if (agreementError != null) {
                     refreshFuture.completeExceptionally(agreementError);
                   } else {
-                    schemaQueriesFactory
-                        .newInstance()
-                        .execute()
-                        .thenApplyAsync(this::parseAndApplySchemaRows, 
adminExecutor)
-                        .whenComplete(
-                            (newMetadata, metadataError) -> {
-                              if (metadataError != null) {
-                                
refreshFuture.completeExceptionally(metadataError);
-                              } else {
-                                refreshFuture.complete(
-                                    new RefreshSchemaResult(newMetadata, 
schemaInAgreement));
-                              }
-
-                              firstSchemaRefreshFuture.complete(null);
-
-                              currentSchemaRefresh = null;
-                              // If another refresh was enqueued during this 
one, run it now
-                              if (queuedSchemaRefresh != null) {
-                                CompletableFuture<RefreshSchemaResult> tmp =
-                                    this.queuedSchemaRefresh;
-                                this.queuedSchemaRefresh = null;
-                                startSchemaRequest(tmp);
-                              }
-                            });
+                    try {
+                      schemaQueriesFactory
+                              .newInstance()
+                              .execute()
+                              .thenApplyAsync(this::parseAndApplySchemaRows, 
adminExecutor)
+                              .whenComplete(
+                                      (newMetadata, metadataError) -> {
+                                        if (metadataError != null) {
+                                          
refreshFuture.completeExceptionally(metadataError);
+                                        } else {
+                                          refreshFuture.complete(
+                                                  new 
RefreshSchemaResult(newMetadata, schemaInAgreement));
+                                        }
+
+                                        
firstSchemaRefreshFuture.complete(null);
+
+                                        currentSchemaRefresh = null;
+                                        // If another refresh was enqueued 
during this one, run it now
+                                        if (queuedSchemaRefresh != null) {
+                                          
CompletableFuture<RefreshSchemaResult> tmp =
+                                                  this.queuedSchemaRefresh;
+                                          this.queuedSchemaRefresh = null;
+                                          startSchemaRequest(tmp);
+                                        }
+                                      });
+                    } catch (Throwable t) {
+                      LOG.debug("[{}] Exception getting new metadata", 
logPrefix, t);
+                      refreshFuture.completeExceptionally(t);
+                    }
                   }
                 });
       } else if (queuedSchemaRefresh == null) {
diff --git 
a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java
 
b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java
index 460f99abd..375209d9f 100644
--- 
a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java
+++ 
b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java
@@ -20,6 +20,7 @@ package com.datastax.oss.driver.internal.core.metadata;
 import static com.datastax.oss.driver.Assertions.assertThat;
 import static com.datastax.oss.driver.Assertions.assertThatStage;
 import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
@@ -33,6 +34,7 @@ import com.datastax.oss.driver.api.core.metadata.Node;
 import com.datastax.oss.driver.internal.core.context.EventBus;
 import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
 import com.datastax.oss.driver.internal.core.context.NettyOptions;
+import com.datastax.oss.driver.internal.core.control.ControlConnection;
 import 
com.datastax.oss.driver.internal.core.metadata.schema.parsing.SchemaParserFactory;
 import 
com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory;
 import com.datastax.oss.driver.internal.core.metrics.MetricsFactory;
@@ -64,6 +66,7 @@ public class MetadataManagerTest {
 
   @Mock private InternalDriverContext context;
   @Mock private NettyOptions nettyOptions;
+  @Mock private ControlConnection controlConnection;
   @Mock private TopologyMonitor topologyMonitor;
   @Mock private DriverConfig config;
   @Mock private DriverExecutionProfile defaultProfile;
@@ -85,6 +88,7 @@ public class MetadataManagerTest {
     when(context.getNettyOptions()).thenReturn(nettyOptions);
 
     when(context.getTopologyMonitor()).thenReturn(topologyMonitor);
+    when(context.getControlConnection()).thenReturn(controlConnection);
 
     
when(defaultProfile.getDuration(DefaultDriverOption.METADATA_SCHEMA_WINDOW))
         .thenReturn(Duration.ZERO);
@@ -286,6 +290,25 @@ public class MetadataManagerTest {
     
assertThat(refresh.broadcastRpcAddressToRemove).isEqualTo(broadcastRpcAddress2);
   }
 
+  @Test
+  public void refreshSchema_should_work() {
+    // Given
+    IllegalStateException expectedException = new IllegalStateException("Error 
we're testing");
+    when(schemaQueriesFactory.newInstance()).thenThrow(expectedException);
+    
when(topologyMonitor.refreshNodeList()).thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class))));
+    
when(topologyMonitor.checkSchemaAgreement()).thenReturn(CompletableFuture.completedFuture(Boolean.TRUE));
+    when(controlConnection.init(anyBoolean(), anyBoolean(), 
anyBoolean())).thenReturn(CompletableFuture.completedFuture(null));
+    metadataManager.refreshNodes(); // required internal state setup for this
+    waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1); // 
sanity check
+
+    // When
+    CompletionStage<MetadataManager.RefreshSchemaResult> result = 
metadataManager.refreshSchema("foo", true, true);
+
+    // Then
+    waitForPendingAdminTasks(() -> result.toCompletableFuture().isDone());
+    assertThatStage(result).isFailed(t -> 
assertThat(t).isEqualTo(expectedException));
+  }
+
   private static class TestMetadataManager extends MetadataManager {
 
     private List<MetadataRefresh> refreshes = new CopyOnWriteArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to