This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8e1dc5e3284 IGNITE-25326 Throw exception when handling replica request
for destroyed table (#5776)
8e1dc5e3284 is described below
commit 8e1dc5e3284a7546179cb70ffe1c06c069d3a004
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed May 7 18:44:18 2025 +0400
IGNITE-25326 Throw exception when handling replica request for destroyed
table (#5776)
---
.../replicator/ZonePartitionReplicaListener.java | 10 +++--
.../ZonePartitionReplicaListenerTest.java | 45 +++++++++++++++++++++-
2 files changed, 49 insertions(+), 6 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index e541a4e440a..349329d5d73 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.partition.replicator;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import java.util.Map;
import java.util.UUID;
@@ -28,6 +29,7 @@ import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.lang.ComponentStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterNodeResolver;
@@ -257,14 +259,14 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
ReplicaTableProcessor replicaProcessor =
replicaProcessors.get(tableId);
if (replicaProcessor == null) {
- // Most of the times this condition should be false.
This logging message is added in case a request got stuck
+ // Most of the times this condition should be false.
This block handles a case when a request got stuck
// somewhere while being replicated and arrived on
this node after the target table had been removed.
// In this case we ignore the command, which should be
safe to do, because the underlying storage was destroyed
- // anyway.
- LOG.warn("Replica processor for table ID {} not found.
Command will be ignored: {}", tableId,
+ // anyway, but we still return an exception.
+ LOG.debug("Replica processor for table ID {} not
found. Command will be ignored: {}", tableId,
request.toStringForLightLogging());
- return completedFuture(new ReplicaResult(null, null));
+ return failedFuture(new
ComponentStoppingException("Table is already destroyed [tableId=" + tableId +
"]"));
}
return replicaProcessor.process(request, replicaPrimacy,
senderId);
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
index 881780961a8..cbe488d44d1 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
@@ -17,25 +17,36 @@
package org.apache.ignite.internal.partition.replicator;
+import static java.util.UUID.randomUUID;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
+import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.UUID;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.lang.ComponentStoppingException;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterNodeResolver;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
@@ -43,8 +54,11 @@ import
org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
@@ -52,6 +66,7 @@ import
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStor
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -60,7 +75,12 @@ import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
+@WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "true")
class ZonePartitionReplicaListenerTest extends BaseIgniteAbstractTest {
+ private static final PartitionReplicationMessagesFactory
REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
+
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
private final ClusterNode localNode = new ClusterNodeImpl(nodeId(1),
"node1", NetworkAddress.from("127.0.0.1:127"));
private final ClusterNode anotherNode = new ClusterNodeImpl(nodeId(2),
"node2", NetworkAddress.from("127.0.0.2:127"));
@@ -78,8 +98,8 @@ class ZonePartitionReplicaListenerTest extends
BaseIgniteAbstractTest {
@Mock
private ValidationSchemasSource validationSchemasSource;
- @Mock
- private SchemaSyncService schemaSyncService;
+ @Spy
+ private final SchemaSyncService schemaSyncService = new
AlwaysSyncedSchemaSyncService();
@Mock
private CatalogService catalogService;
@@ -171,4 +191,25 @@ class ZonePartitionReplicaListenerTest extends
BaseIgniteAbstractTest {
assertThat(partitionReplicaListener.invoke(request, localNode.id()),
willThrow(PrimaryReplicaMissException.class));
}
+
+ @Test
+ void exceptionIsReturnedIfTableProcessorIsAbsent() {
+ Catalog catalog = mock(Catalog.class);
+ when(catalogService.activeCatalog(anyLong())).thenReturn(catalog);
+ when(catalog.table(1)).thenReturn(mock(CatalogTableDescriptor.class));
+
+ ScanCloseReplicaRequest request =
REPLICATION_MESSAGES_FACTORY.scanCloseReplicaRequest()
+ .groupId(toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY,
groupId))
+ .tableId(1)
+ .scanId(1L)
+ .timestamp(clock.now())
+ .transactionId(randomUUID())
+ .build();
+
+ ComponentStoppingException ex = assertWillThrow(
+ partitionReplicaListener.invoke(request, localNode.id()),
+ ComponentStoppingException.class
+ );
+ assertThat(ex.getMessage(), is("Table is already destroyed
[tableId=1]"));
+ }
}