This is an automated email from the ASF dual-hosted git repository. absurdfarce pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git
The following commit(s) were added to refs/heads/4.x by this push: new 6c4832919 CASSANDRA-19468 Don't swallow exception during metadata refresh 6c4832919 is described below commit 6c48329199862215abc22170769fd1a165e80a15 Author: Ammar Khaku <ammar.kh...@gmail.com> AuthorDate: Thu Mar 14 16:55:59 2024 -0700 CASSANDRA-19468 Don't swallow exception during metadata refresh If an exception was thrown while getting new metadata as part of schema refresh it died on the admin executor instead of being propagated to the CompletableFuture argument. Instead, catch those exceptions and hand them off to the CompletableFuture. patch by Ammar Khaku; reviewed by Chris Lohfink, Bret McGuire for CASSANDRA-19468 --- .../internal/core/metadata/MetadataManager.java | 53 ++++++++++++---------- .../core/metadata/MetadataManagerTest.java | 23 ++++++++++ 2 files changed, 52 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java index 28e8b18f1..c9abfb7a6 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java @@ -437,30 +437,35 @@ public class MetadataManager implements AsyncAutoCloseable { if (agreementError != null) { refreshFuture.completeExceptionally(agreementError); } else { - schemaQueriesFactory - .newInstance() - .execute() - .thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor) - .whenComplete( - (newMetadata, metadataError) -> { - if (metadataError != null) { - refreshFuture.completeExceptionally(metadataError); - } else { - refreshFuture.complete( - new RefreshSchemaResult(newMetadata, schemaInAgreement)); - } - - firstSchemaRefreshFuture.complete(null); - - currentSchemaRefresh = null; - // If another refresh was enqueued during this one, run it now - if (queuedSchemaRefresh != null) { - CompletableFuture<RefreshSchemaResult> tmp = - this.queuedSchemaRefresh; - this.queuedSchemaRefresh = null; - startSchemaRequest(tmp); - } - }); + try { + schemaQueriesFactory + .newInstance() + .execute() + .thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor) + .whenComplete( + (newMetadata, metadataError) -> { + if (metadataError != null) { + refreshFuture.completeExceptionally(metadataError); + } else { + refreshFuture.complete( + new RefreshSchemaResult(newMetadata, schemaInAgreement)); + } + + firstSchemaRefreshFuture.complete(null); + + currentSchemaRefresh = null; + // If another refresh was enqueued during this one, run it now + if (queuedSchemaRefresh != null) { + CompletableFuture<RefreshSchemaResult> tmp = + this.queuedSchemaRefresh; + this.queuedSchemaRefresh = null; + startSchemaRequest(tmp); + } + }); + } catch (Throwable t) { + LOG.debug("[{}] Exception getting new metadata", logPrefix, t); + refreshFuture.completeExceptionally(t); + } } }); } else if (queuedSchemaRefresh == null) { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java index 460f99abd..375209d9f 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java @@ -20,6 +20,7 @@ package com.datastax.oss.driver.internal.core.metadata; import static com.datastax.oss.driver.Assertions.assertThat; import static com.datastax.oss.driver.Assertions.assertThatStage; import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -33,6 +34,7 @@ import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.internal.core.context.EventBus; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.context.NettyOptions; +import com.datastax.oss.driver.internal.core.control.ControlConnection; import com.datastax.oss.driver.internal.core.metadata.schema.parsing.SchemaParserFactory; import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory; import com.datastax.oss.driver.internal.core.metrics.MetricsFactory; @@ -64,6 +66,7 @@ public class MetadataManagerTest { @Mock private InternalDriverContext context; @Mock private NettyOptions nettyOptions; + @Mock private ControlConnection controlConnection; @Mock private TopologyMonitor topologyMonitor; @Mock private DriverConfig config; @Mock private DriverExecutionProfile defaultProfile; @@ -85,6 +88,7 @@ public class MetadataManagerTest { when(context.getNettyOptions()).thenReturn(nettyOptions); when(context.getTopologyMonitor()).thenReturn(topologyMonitor); + when(context.getControlConnection()).thenReturn(controlConnection); when(defaultProfile.getDuration(DefaultDriverOption.METADATA_SCHEMA_WINDOW)) .thenReturn(Duration.ZERO); @@ -286,6 +290,25 @@ public class MetadataManagerTest { assertThat(refresh.broadcastRpcAddressToRemove).isEqualTo(broadcastRpcAddress2); } + @Test + public void refreshSchema_should_work() { + // Given + IllegalStateException expectedException = new IllegalStateException("Error we're testing"); + when(schemaQueriesFactory.newInstance()).thenThrow(expectedException); + when(topologyMonitor.refreshNodeList()).thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class)))); + when(topologyMonitor.checkSchemaAgreement()).thenReturn(CompletableFuture.completedFuture(Boolean.TRUE)); + when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean())).thenReturn(CompletableFuture.completedFuture(null)); + metadataManager.refreshNodes(); // required internal state setup for this + waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1); // sanity check + + // When + CompletionStage<MetadataManager.RefreshSchemaResult> result = metadataManager.refreshSchema("foo", true, true); + + // Then + waitForPendingAdminTasks(() -> result.toCompletableFuture().isDone()); + assertThatStage(result).isFailed(t -> assertThat(t).isEqualTo(expectedException)); + } + private static class TestMetadataManager extends MetadataManager { private List<MetadataRefresh> refreshes = new CopyOnWriteArrayList<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org