This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e9ea7bd0bb IGNITE-17959 Added retrying in ReplicaService for a case 
when an operation throws ReplicaUnavailableException in ReplicaManager. (#1369)
e9ea7bd0bb is described below

commit e9ea7bd0bbffd7145b9a4fe83bd9dcc31996455d
Author: Sergey Uttsel <[email protected]>
AuthorDate: Tue Nov 29 20:46:04 2022 +0300

    IGNITE-17959 Added retrying in ReplicaService for a case when an operation 
throws ReplicaUnavailableException in ReplicaManager. (#1369)
---
 .../ignite/internal/replicator/ReplicaManager.java | 101 ++++++++----
 .../ignite/internal/replicator/ReplicaService.java |  67 +++++++-
 .../replicator/message/AwaitReplicaRequest.java    |  33 ++++
 .../replicator/message/AwaitReplicaResponse.java   |  27 ++++
 .../replicator/message/ReplicaMessageGroup.java    |   6 +
 .../app/ItIgniteInMemoryNodeRestartTest.java       |   1 -
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 -
 .../internal/sql/engine/ItIndexSpoolTest.java      |   2 -
 .../ignite/distributed/ReplicaUnavailableTest.java | 180 +++++++++++++++++++++
 9 files changed, 374 insertions(+), 47 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 46849de85c..23c4549741 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -41,6 +42,7 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
@@ -79,7 +81,7 @@ public class ReplicaManager implements IgniteComponent {
     private final NetworkMessageHandler handler;
 
     /** Replicas. */
-    private final ConcurrentHashMap<ReplicationGroupId, Replica> replicas = 
new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<ReplicationGroupId, 
CompletableFuture<Replica>> replicas = new ConcurrentHashMap<>();
 
     /** A hybrid logical clock. */
     private final HybridClock clock;
@@ -117,17 +119,48 @@ public class ReplicaManager implements IgniteComponent {
 
                 ReplicaRequest request = (ReplicaRequest) message;
 
-                HybridTimestamp requestTimestamp = extractTimestamp(request);
+                // Notify the sender that the Replica is created and ready to 
process requests.
+                if (request instanceof AwaitReplicaRequest) {
+                    replicas.compute(request.groupId(), (replicationGroupId, 
replicaFut) -> {
+                        if (replicaFut == null) {
+                            replicaFut = new CompletableFuture<>();
+                        }
+
+                        if (!replicaFut.isDone()) {
+                            replicaFut.thenCompose(
+                                    ignore -> {
+                                        IgniteUtils.inBusyLock(
+                                                busyLock,
+                                                () -> 
sendAwaitReplicaResponse(sender, correlationId)
+                                        );
+
+                                        return null;
+                                    }
+                            );
+
+                            return replicaFut;
+                        } else {
+                            IgniteUtils.inBusyLock(busyLock, () -> 
sendAwaitReplicaResponse(sender, correlationId));
+
+                            return replicaFut;
+                        }
+                    });
+
+                    return;
+                }
 
-                Replica replica = replicas.get(request.groupId());
+                CompletableFuture<Replica> replicaFut = 
replicas.get(request.groupId());
 
-                if (replica == null) {
+                HybridTimestamp requestTimestamp = extractTimestamp(request);
+
+                if (replicaFut == null || !replicaFut.isDone()) {
                     sendReplicaUnavailableErrorResponse(sender, correlationId, 
request, requestTimestamp);
 
                     return;
                 }
 
-                CompletableFuture<Object> result = 
replica.processRequest(request);
+                // replicaFut is always completed here.
+                CompletableFuture<Object> result = 
replicaFut.join().processRequest(request);
 
                 result.handle((res, ex) -> {
                     NetworkMessage msg;
@@ -150,25 +183,6 @@ public class ReplicaManager implements IgniteComponent {
         };
     }
 
-    /**
-     * Gets a replica by its replica group id.
-     *
-     * @param replicaGrpId Replication group id.
-     * @return Instance of the replica or {@code null} if the replica is not 
started.
-     * @throws NodeStoppingException If the node is stopping.
-     */
-    public Replica replica(ReplicationGroupId replicaGrpId) throws 
NodeStoppingException {
-        if (!busyLock.enterBusy()) {
-            throw new NodeStoppingException();
-        }
-
-        try {
-            return replicas.get(replicaGrpId);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
     /**
      * Starts a replica. If a replica with the same partition id already 
exists, the method throws an exception.
      *
@@ -204,15 +218,20 @@ public class ReplicaManager implements IgniteComponent {
             ReplicationGroupId replicaGrpId,
             ReplicaListener listener
     ) {
-        var replica = new Replica(replicaGrpId, listener);
+        replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
+            if (replicaFut == null) {
+                replicaFut = CompletableFuture.completedFuture(new 
Replica(replicaGrpId, listener));
 
-        Replica previous = replicas.putIfAbsent(replicaGrpId, replica);
+                return replicaFut;
+            } else {
+                replicaFut.complete(new Replica(replicaGrpId, listener));
 
-        if (previous != null) {
-            throw new ReplicaIsAlreadyStartedException(replicaGrpId);
-        }
+                return replicaFut;
+            }
+        });
 
-        return replica;
+        // replicaFut is always completed here.
+        return replicas.get(replicaGrpId).join();
     }
 
     /**
@@ -319,6 +338,18 @@ public class ReplicaManager implements IgniteComponent {
         }
     }
 
+    /**
+     * Sends await replica response.
+     */
+    private void sendAwaitReplicaResponse(ClusterNode sender, @Nullable Long 
correlationId) {
+        clusterNetSvc.messagingService().respond(
+                sender,
+                REPLICA_MESSAGES_FACTORY
+                        .awaitReplicaResponse()
+                        .build(),
+                correlationId);
+    }
+
     /**
      * Prepares replica response.
      */
@@ -360,11 +391,13 @@ public class ReplicaManager implements IgniteComponent {
      */
     private void idleSafeTimeSync() {
         replicas.values().forEach(r -> {
-            ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
-                    .groupId(r.groupId())
-                    .build();
+            if (r.isDone()) {
+                ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+                        .groupId(r.join().groupId())
+                        .build();
 
-            r.processRequest(req);
+                r.join().processRequest(req);
+            }
         });
     }
 
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 934777888e..2b9ea6817c 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -20,20 +20,26 @@ package org.apache.ignite.internal.replicator;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.hlc.HybridClock;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
 import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessage;
 
 /**
  * The service is intended to execute requests on replicas.
@@ -48,6 +54,12 @@ public class ReplicaService {
     /** A hybrid logical clock. */
     private final HybridClock clock;
 
+    /** Requests to retry. */
+    private final Map<ClusterNode, CompletableFuture<NetworkMessage>> 
pendingInvokes = new ConcurrentHashMap<>();
+
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
     /**
      * The constructor of replica client.
      *
@@ -73,7 +85,7 @@ public class ReplicaService {
      */
     private <R> CompletableFuture<R> sendToReplica(ClusterNode node, 
ReplicaRequest req) {
 
-        CompletableFuture<R> res = new CompletableFuture<>();
+        AtomicReference<CompletableFuture<R>> res = new AtomicReference<>(new 
CompletableFuture<>());
 
         // TODO: IGNITE-17824 Use named executor instead of default one in 
order to process replica Response.
         messagingService.invoke(node, req, 
RPC_TIMEOUT).whenCompleteAsync((response, throwable) -> {
@@ -83,10 +95,10 @@ public class ReplicaService {
                 }
 
                 if (throwable instanceof TimeoutException) {
-                    res.completeExceptionally(new 
ReplicationTimeoutException(req.groupId()));
+                    res.get().completeExceptionally(new 
ReplicationTimeoutException(req.groupId()));
                 }
 
-                res.completeExceptionally(withCause(
+                res.get().completeExceptionally(withCause(
                         ReplicationException::new,
                         REPLICA_COMMON_ERR,
                         "Failed to process replica request [replicaGroupId=" + 
req.groupId() + ']',
@@ -100,14 +112,57 @@ public class ReplicaService {
 
                 if (response instanceof ErrorReplicaResponse) {
                     var errResp = (ErrorReplicaResponse) response;
-                    res.completeExceptionally(errResp.throwable());
+
+                    if (errResp.throwable() instanceof 
ReplicaUnavailableException) {
+                        pendingInvokes.compute(node, (clusterNode, fut) -> {
+                            if (fut == null) {
+                                AwaitReplicaRequest awaitReplicaReq = 
REPLICA_MESSAGES_FACTORY.awaitReplicaRequest()
+                                        .groupId(req.groupId())
+                                        .build();
+
+                                fut = messagingService.invoke(node, 
awaitReplicaReq, RPC_TIMEOUT)
+                                        .whenComplete((response0, throwable0) 
-> {
+                                            pendingInvokes.remove(node);
+                                        });
+                            }
+
+                            fut.handle((response0, throwable0) -> {
+                                if (throwable0 != null) {
+                                    if (throwable0 instanceof 
CompletionException) {
+                                        throwable0 = throwable0.getCause();
+                                    }
+
+                                    if (throwable0 instanceof 
TimeoutException) {
+                                        
res.get().completeExceptionally(errResp.throwable());
+                                    }
+
+                                    res.get().completeExceptionally(withCause(
+                                            ReplicationException::new,
+                                            REPLICA_COMMON_ERR,
+                                            "Failed to process replica request 
[replicaGroupId=" + req.groupId() + ']',
+                                            throwable0));
+                                } else {
+                                    res.get().thenCompose(ignore -> 
sendToReplica(node, req));
+
+                                    res.get().complete(null);
+                                }
+
+                                return null;
+                            });
+
+                            return fut;
+                        });
+
+                    } else {
+                        res.get().completeExceptionally(errResp.throwable());
+                    }
                 } else {
-                    res.complete((R) ((ReplicaResponse) response).result());
+                    res.get().complete((R) ((ReplicaResponse) 
response).result());
                 }
             }
         });
 
-        return res;
+        return res.get();
     }
 
     /**
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaRequest.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaRequest.java
new file mode 100644
index 0000000000..4fe668fb8e
--- /dev/null
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaRequest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+import org.apache.ignite.internal.replicator.Replica;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * When some request is failed in {@link ReplicaService} with a {@link 
ReplicaUnavailableException}
+ * due to the {@link Replica} has not created in {@link ReplicaManager},
+ * then {@link AwaitReplicaRequest} will be sent to await replica creation.
+ */
+@Transferable(ReplicaMessageGroup.AWAIT_REPLICA_REQUEST)
+public interface AwaitReplicaRequest extends ReplicaRequest {
+}
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaResponse.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaResponse.java
new file mode 100644
index 0000000000..29a96d911e
--- /dev/null
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaResponse.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Await replica response.
+ */
+@Transferable(ReplicaMessageGroup.AWAIT_REPLICA_RESPONSE)
+public interface AwaitReplicaResponse extends ReplicaResponse {
+}
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
index abac579576..62e4aa1ddb 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
@@ -41,4 +41,10 @@ public class ReplicaMessageGroup {
 
     /** Message type for {@link ReplicaSafeTimeSyncRequest}. */
     public static final short SAFE_TIME_SYNC_REQUEST = 6;
+
+    /** Message type for {@link AwaitReplicaRequest}. */
+    public static final short AWAIT_REPLICA_REQUEST = 7;
+
+    /** Message type for {@link AwaitReplicaResponse}. */
+    public static final short AWAIT_REPLICA_RESPONSE = 8;
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index c33b163c27..4db24c1906 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -175,7 +175,6 @@ public class ItIgniteInMemoryNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Restarts an in-memory node that is not a leader of the table's 
partition.
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959";)
     @Test
     public void inMemoryNodeRestartNotLeader(TestInfo testInfo) throws 
Exception {
         // Start three nodes, the first one is going to be CMG and MetaStorage 
leader.
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a8d652cc4c..cdcfbf06db 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -829,7 +829,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Checks that the table created in cluster of 2 nodes, is recovered on a 
node after restart of this node.
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959";)
     @Test
     public void testRecoveryOnOneNode() {
         Ignite ignite = startNode(0);
@@ -852,7 +851,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Checks that a cluster is able to restart when some changes were made in 
configuration.
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959";)
     @Test
     public void testRestartDiffConfig() {
         List<IgniteImpl> ignites = startNodes(2);
@@ -882,7 +880,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * The test for node restart when there is a gap between the node local 
configuration and distributed configuration.
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959";)
     @Test
     @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 
value = "0")
     public void testCfgGapWithoutData() {
@@ -912,7 +909,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
      * group stops for some time while restarting node is being recovered. The 
recovery process should continue and eventually succeed after
      * metastorage group starts again.
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959";)
     @Test
     @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 
value = "0")
     public void testMetastorageStop() {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
index 2be2b4fa1a..113f4e91b7 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.table.Table;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -70,7 +69,6 @@ public class ItIndexSpoolTest extends 
AbstractBasicIntegrationTest {
     /**
      * Test.
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959";)
     @ParameterizedTest(name = "tableSize={0}, partitions={1}")
     @MethodSource("rowsWithPartitionsArgs")
     public void test(int rows, int partitions) {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
new file mode 100644
index 0000000000..ff879ea012
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static 
org.apache.ignite.distributed.ItTxDistributedTestSingleNode.NODE_PORT_BASE;
+import static 
org.apache.ignite.distributed.ItTxDistributedTestSingleNode.startNode;
+import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.replicator.Replica;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Tests handling requests from {@link ReplicaService} to {@link 
ReplicaManager} when the {@link Replica}
+ * is not started.
+ */
+public class ReplicaUnavailableTest extends IgniteAbstractTest {
+    private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
+            1,
+            new Column[]{new Column("key", NativeTypes.INT64, false)},
+            new Column[]{new Column("value", NativeTypes.INT64, false)}
+    );
+
+    private final TableMessagesFactory tableMessagesFactory = new 
TableMessagesFactory();
+
+    private final TestInfo testInfo;
+
+    private ReplicaService replicaService;
+
+    private ReplicaManager replicaManager;
+
+    private ClusterService clusterService;
+
+    private NetworkAddress networkAddress;
+
+    private String name = "client";
+
+    @BeforeEach
+    public void setup() {
+        networkAddress = new NetworkAddress(getLocalAddress(), NODE_PORT_BASE 
+ 1);
+
+        var nodeFinder = new StaticNodeFinder(List.of(networkAddress));
+
+        clusterService = startNode(testInfo, name, NODE_PORT_BASE + 1, 
nodeFinder);
+
+        HybridClock clock = mock(HybridClock.class);
+
+        replicaService = new ReplicaService(clusterService.messagingService(), 
clock);
+
+        replicaManager = new ReplicaManager(clusterService,
+                clock,
+                Set.of(TableMessageGroup.class, TxMessageGroup.class));
+
+        replicaManager.start();
+    }
+
+    @AfterEach
+    public void teardown() {
+        clusterService.stop();
+    }
+
+    public ReplicaUnavailableTest(TestInfo testInfo) {
+        this.testInfo = testInfo;
+    }
+
+    @Test
+    public void testWithReplicaStartedAfterRequestSending() throws Exception {
+        ClusterNode clusterNode = new ClusterNode(name, name, networkAddress);
+
+        TablePartitionId tablePartitionId = new 
TablePartitionId(UUID.randomUUID(), 1);
+
+        ReadWriteSingleRowReplicaRequest request = 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                .groupId(tablePartitionId)
+                .binaryRow(createKeyValueRow(1L, 1L))
+                .requestType(RequestType.RW_GET)
+                .build();
+
+        
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
+                (message, sender, correlationId) -> {
+                    try {
+                        replicaManager.startReplica(tablePartitionId, request0 
-> CompletableFuture.completedFuture(null));
+                    } catch (NodeStoppingException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+        );
+
+        replicaService.invoke(clusterNode, request).get(10, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testWithNotStartedReplica() {
+        ClusterNode clusterNode = new ClusterNode(name, name, networkAddress);
+
+        TablePartitionId tablePartitionId = new 
TablePartitionId(UUID.randomUUID(), 1);
+
+        ReadWriteSingleRowReplicaRequest request = 
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+                .groupId(tablePartitionId)
+                .binaryRow(createKeyValueRow(1L, 1L))
+                .requestType(RequestType.RW_GET)
+                .build();
+
+        Exception e0 = null;
+        Exception e1 = null;
+
+        try {
+            replicaService.invoke(clusterNode, request).get(10, 
TimeUnit.SECONDS);
+        } catch (Exception e) {
+            e0 = e;
+        }
+
+        try {
+            replicaService.invoke(clusterNode, request).get(10, 
TimeUnit.SECONDS);
+        } catch (Exception e) {
+            e1 = e;
+        }
+
+        assertTrue(e0 != null);
+        assertTrue(e0.getCause() instanceof ReplicaUnavailableException, 
e0.toString());
+
+        assertTrue(e1 != null);
+        assertTrue(e1.getCause() instanceof ReplicaUnavailableException, 
e1.toString());
+    }
+
+    private static Row createKeyValueRow(long id, long value) {
+        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
+
+        rowBuilder.appendLong(id);
+        rowBuilder.appendLong(value);
+
+        return new Row(SCHEMA, new ByteBufferRow(rowBuilder.toBytes()));
+    }
+}

Reply via email to