This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e1dc5e3284 IGNITE-25326 Throw exception when handling replica request 
for destroyed table (#5776)
8e1dc5e3284 is described below

commit 8e1dc5e3284a7546179cb70ffe1c06c069d3a004
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed May 7 18:44:18 2025 +0400

    IGNITE-25326 Throw exception when handling replica request for destroyed 
table (#5776)
---
 .../replicator/ZonePartitionReplicaListener.java   | 10 +++--
 .../ZonePartitionReplicaListenerTest.java          | 45 +++++++++++++++++++++-
 2 files changed, 49 insertions(+), 6 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index e541a4e440a..349329d5d73 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.partition.replicator;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 
 import java.util.Map;
 import java.util.UUID;
@@ -28,6 +29,7 @@ import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.lang.ComponentStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
@@ -257,14 +259,14 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
                     ReplicaTableProcessor replicaProcessor = 
replicaProcessors.get(tableId);
 
                     if (replicaProcessor == null) {
-                        // Most of the times this condition should be false. 
This logging message is added in case a request got stuck
+                        // Most of the times this condition should be false. 
This block handles a case when a request got stuck
                         // somewhere while being replicated and arrived on 
this node after the target table had been removed.
                         // In this case we ignore the command, which should be 
safe to do, because the underlying storage was destroyed
-                        // anyway.
-                        LOG.warn("Replica processor for table ID {} not found. 
Command will be ignored: {}", tableId,
+                        // anyway, but we still return an exception.
+                        LOG.debug("Replica processor for table ID {} not 
found. Command will be ignored: {}", tableId,
                                 request.toStringForLightLogging());
 
-                        return completedFuture(new ReplicaResult(null, null));
+                        return failedFuture(new 
ComponentStoppingException("Table is already destroyed [tableId=" + tableId + 
"]"));
                     }
 
                     return replicaProcessor.process(request, replicaPrimacy, 
senderId);
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
index 881780961a8..cbe488d44d1 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java
@@ -17,25 +17,36 @@
 
 package org.apache.ignite.internal.partition.replicator;
 
+import static java.util.UUID.randomUUID;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
+import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.UUID;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.lang.ComponentStoppingException;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
@@ -43,8 +54,11 @@ import 
org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
 import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
@@ -52,6 +66,7 @@ import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStor
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -60,7 +75,12 @@ import org.mockito.Spy;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
+@WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "true")
 class ZonePartitionReplicaListenerTest extends BaseIgniteAbstractTest {
+    private static final PartitionReplicationMessagesFactory 
REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
+
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
     private final ClusterNode localNode = new ClusterNodeImpl(nodeId(1), 
"node1", NetworkAddress.from("127.0.0.1:127"));
 
     private final ClusterNode anotherNode = new ClusterNodeImpl(nodeId(2), 
"node2", NetworkAddress.from("127.0.0.2:127"));
@@ -78,8 +98,8 @@ class ZonePartitionReplicaListenerTest extends 
BaseIgniteAbstractTest {
     @Mock
     private ValidationSchemasSource validationSchemasSource;
 
-    @Mock
-    private SchemaSyncService schemaSyncService;
+    @Spy
+    private final SchemaSyncService schemaSyncService = new 
AlwaysSyncedSchemaSyncService();
 
     @Mock
     private CatalogService catalogService;
@@ -171,4 +191,25 @@ class ZonePartitionReplicaListenerTest extends 
BaseIgniteAbstractTest {
 
         assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
     }
+
+    @Test
+    void exceptionIsReturnedIfTableProcessorIsAbsent() {
+        Catalog catalog = mock(Catalog.class);
+        when(catalogService.activeCatalog(anyLong())).thenReturn(catalog);
+        when(catalog.table(1)).thenReturn(mock(CatalogTableDescriptor.class));
+
+        ScanCloseReplicaRequest request = 
REPLICATION_MESSAGES_FACTORY.scanCloseReplicaRequest()
+                .groupId(toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, 
groupId))
+                .tableId(1)
+                .scanId(1L)
+                .timestamp(clock.now())
+                .transactionId(randomUUID())
+                .build();
+
+        ComponentStoppingException ex = assertWillThrow(
+                partitionReplicaListener.invoke(request, localNode.id()),
+                ComponentStoppingException.class
+        );
+        assertThat(ex.getMessage(), is("Table is already destroyed 
[tableId=1]"));
+    }
 }

Reply via email to