This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e9ea7bd0bb IGNITE-17959 Added retrying in ReplicaService for a case
when an operation throws ReplicaUnavailableException in ReplicaManager. (#1369)
e9ea7bd0bb is described below
commit e9ea7bd0bbffd7145b9a4fe83bd9dcc31996455d
Author: Sergey Uttsel <[email protected]>
AuthorDate: Tue Nov 29 20:46:04 2022 +0300
IGNITE-17959 Added retrying in ReplicaService for a case when an operation
throws ReplicaUnavailableException in ReplicaManager. (#1369)
---
.../ignite/internal/replicator/ReplicaManager.java | 101 ++++++++----
.../ignite/internal/replicator/ReplicaService.java | 67 +++++++-
.../replicator/message/AwaitReplicaRequest.java | 33 ++++
.../replicator/message/AwaitReplicaResponse.java | 27 ++++
.../replicator/message/ReplicaMessageGroup.java | 6 +
.../app/ItIgniteInMemoryNodeRestartTest.java | 1 -
.../runner/app/ItIgniteNodeRestartTest.java | 4 -
.../internal/sql/engine/ItIndexSpoolTest.java | 2 -
.../ignite/distributed/ReplicaUnavailableTest.java | 180 +++++++++++++++++++++
9 files changed, 374 insertions(+), 47 deletions(-)
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 46849de85c..23c4549741 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import
org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException;
import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -41,6 +42,7 @@ import
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
@@ -79,7 +81,7 @@ public class ReplicaManager implements IgniteComponent {
private final NetworkMessageHandler handler;
/** Replicas. */
- private final ConcurrentHashMap<ReplicationGroupId, Replica> replicas =
new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<ReplicationGroupId,
CompletableFuture<Replica>> replicas = new ConcurrentHashMap<>();
/** A hybrid logical clock. */
private final HybridClock clock;
@@ -117,17 +119,48 @@ public class ReplicaManager implements IgniteComponent {
ReplicaRequest request = (ReplicaRequest) message;
- HybridTimestamp requestTimestamp = extractTimestamp(request);
+ // Notify the sender that the Replica is created and ready to
process requests.
+ if (request instanceof AwaitReplicaRequest) {
+ replicas.compute(request.groupId(), (replicationGroupId,
replicaFut) -> {
+ if (replicaFut == null) {
+ replicaFut = new CompletableFuture<>();
+ }
+
+ if (!replicaFut.isDone()) {
+ replicaFut.thenCompose(
+ ignore -> {
+ IgniteUtils.inBusyLock(
+ busyLock,
+ () ->
sendAwaitReplicaResponse(sender, correlationId)
+ );
+
+ return null;
+ }
+ );
+
+ return replicaFut;
+ } else {
+ IgniteUtils.inBusyLock(busyLock, () ->
sendAwaitReplicaResponse(sender, correlationId));
+
+ return replicaFut;
+ }
+ });
+
+ return;
+ }
- Replica replica = replicas.get(request.groupId());
+ CompletableFuture<Replica> replicaFut =
replicas.get(request.groupId());
- if (replica == null) {
+ HybridTimestamp requestTimestamp = extractTimestamp(request);
+
+ if (replicaFut == null || !replicaFut.isDone()) {
sendReplicaUnavailableErrorResponse(sender, correlationId,
request, requestTimestamp);
return;
}
- CompletableFuture<Object> result =
replica.processRequest(request);
+ // replicaFut is always completed here.
+ CompletableFuture<Object> result =
replicaFut.join().processRequest(request);
result.handle((res, ex) -> {
NetworkMessage msg;
@@ -150,25 +183,6 @@ public class ReplicaManager implements IgniteComponent {
};
}
- /**
- * Gets a replica by its replica group id.
- *
- * @param replicaGrpId Replication group id.
- * @return Instance of the replica or {@code null} if the replica is not
started.
- * @throws NodeStoppingException If the node is stopping.
- */
- public Replica replica(ReplicationGroupId replicaGrpId) throws
NodeStoppingException {
- if (!busyLock.enterBusy()) {
- throw new NodeStoppingException();
- }
-
- try {
- return replicas.get(replicaGrpId);
- } finally {
- busyLock.leaveBusy();
- }
- }
-
/**
* Starts a replica. If a replica with the same partition id already
exists, the method throws an exception.
*
@@ -204,15 +218,20 @@ public class ReplicaManager implements IgniteComponent {
ReplicationGroupId replicaGrpId,
ReplicaListener listener
) {
- var replica = new Replica(replicaGrpId, listener);
+ replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
+ if (replicaFut == null) {
+ replicaFut = CompletableFuture.completedFuture(new
Replica(replicaGrpId, listener));
- Replica previous = replicas.putIfAbsent(replicaGrpId, replica);
+ return replicaFut;
+ } else {
+ replicaFut.complete(new Replica(replicaGrpId, listener));
- if (previous != null) {
- throw new ReplicaIsAlreadyStartedException(replicaGrpId);
- }
+ return replicaFut;
+ }
+ });
- return replica;
+ // replicaFut is always completed here.
+ return replicas.get(replicaGrpId).join();
}
/**
@@ -319,6 +338,18 @@ public class ReplicaManager implements IgniteComponent {
}
}
+ /**
+ * Sends await replica response.
+ */
+ private void sendAwaitReplicaResponse(ClusterNode sender, @Nullable Long
correlationId) {
+ clusterNetSvc.messagingService().respond(
+ sender,
+ REPLICA_MESSAGES_FACTORY
+ .awaitReplicaResponse()
+ .build(),
+ correlationId);
+ }
+
/**
* Prepares replica response.
*/
@@ -360,11 +391,13 @@ public class ReplicaManager implements IgniteComponent {
*/
private void idleSafeTimeSync() {
replicas.values().forEach(r -> {
- ReplicaSafeTimeSyncRequest req =
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
- .groupId(r.groupId())
- .build();
+ if (r.isDone()) {
+ ReplicaSafeTimeSyncRequest req =
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+ .groupId(r.join().groupId())
+ .build();
- r.processRequest(req);
+ r.join().processRequest(req);
+ }
});
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 934777888e..2b9ea6817c 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -20,20 +20,26 @@ package org.apache.ignite.internal.replicator;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.hlc.HybridClock;
import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest;
import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessage;
/**
* The service is intended to execute requests on replicas.
@@ -48,6 +54,12 @@ public class ReplicaService {
/** A hybrid logical clock. */
private final HybridClock clock;
+ /** Requests to retry. */
+ private final Map<ClusterNode, CompletableFuture<NetworkMessage>>
pendingInvokes = new ConcurrentHashMap<>();
+
+ /** Replicator network message factory. */
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
/**
* The constructor of replica client.
*
@@ -73,7 +85,7 @@ public class ReplicaService {
*/
private <R> CompletableFuture<R> sendToReplica(ClusterNode node,
ReplicaRequest req) {
- CompletableFuture<R> res = new CompletableFuture<>();
+ AtomicReference<CompletableFuture<R>> res = new AtomicReference<>(new
CompletableFuture<>());
// TODO: IGNITE-17824 Use named executor instead of default one in
order to process replica Response.
messagingService.invoke(node, req,
RPC_TIMEOUT).whenCompleteAsync((response, throwable) -> {
@@ -83,10 +95,10 @@ public class ReplicaService {
}
if (throwable instanceof TimeoutException) {
- res.completeExceptionally(new
ReplicationTimeoutException(req.groupId()));
+ res.get().completeExceptionally(new
ReplicationTimeoutException(req.groupId()));
}
- res.completeExceptionally(withCause(
+ res.get().completeExceptionally(withCause(
ReplicationException::new,
REPLICA_COMMON_ERR,
"Failed to process replica request [replicaGroupId=" +
req.groupId() + ']',
@@ -100,14 +112,57 @@ public class ReplicaService {
if (response instanceof ErrorReplicaResponse) {
var errResp = (ErrorReplicaResponse) response;
- res.completeExceptionally(errResp.throwable());
+
+ if (errResp.throwable() instanceof
ReplicaUnavailableException) {
+ pendingInvokes.compute(node, (clusterNode, fut) -> {
+ if (fut == null) {
+ AwaitReplicaRequest awaitReplicaReq =
REPLICA_MESSAGES_FACTORY.awaitReplicaRequest()
+ .groupId(req.groupId())
+ .build();
+
+ fut = messagingService.invoke(node,
awaitReplicaReq, RPC_TIMEOUT)
+ .whenComplete((response0, throwable0)
-> {
+ pendingInvokes.remove(node);
+ });
+ }
+
+ fut.handle((response0, throwable0) -> {
+ if (throwable0 != null) {
+ if (throwable0 instanceof
CompletionException) {
+ throwable0 = throwable0.getCause();
+ }
+
+ if (throwable0 instanceof
TimeoutException) {
+
res.get().completeExceptionally(errResp.throwable());
+ }
+
+ res.get().completeExceptionally(withCause(
+ ReplicationException::new,
+ REPLICA_COMMON_ERR,
+ "Failed to process replica request
[replicaGroupId=" + req.groupId() + ']',
+ throwable0));
+ } else {
+ res.get().thenCompose(ignore ->
sendToReplica(node, req));
+
+ res.get().complete(null);
+ }
+
+ return null;
+ });
+
+ return fut;
+ });
+
+ } else {
+ res.get().completeExceptionally(errResp.throwable());
+ }
} else {
- res.complete((R) ((ReplicaResponse) response).result());
+ res.get().complete((R) ((ReplicaResponse)
response).result());
}
}
});
- return res;
+ return res.get();
}
/**
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaRequest.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaRequest.java
new file mode 100644
index 0000000000..4fe668fb8e
--- /dev/null
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaRequest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+import org.apache.ignite.internal.replicator.Replica;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * When some request is failed in {@link ReplicaService} with a {@link
ReplicaUnavailableException}
+ * due to the {@link Replica} has not created in {@link ReplicaManager},
+ * then {@link AwaitReplicaRequest} will be sent to await replica creation.
+ */
+@Transferable(ReplicaMessageGroup.AWAIT_REPLICA_REQUEST)
+public interface AwaitReplicaRequest extends ReplicaRequest {
+}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaResponse.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaResponse.java
new file mode 100644
index 0000000000..29a96d911e
--- /dev/null
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/AwaitReplicaResponse.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Await replica response.
+ */
+@Transferable(ReplicaMessageGroup.AWAIT_REPLICA_RESPONSE)
+public interface AwaitReplicaResponse extends ReplicaResponse {
+}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
index abac579576..62e4aa1ddb 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
@@ -41,4 +41,10 @@ public class ReplicaMessageGroup {
/** Message type for {@link ReplicaSafeTimeSyncRequest}. */
public static final short SAFE_TIME_SYNC_REQUEST = 6;
+
+ /** Message type for {@link AwaitReplicaRequest}. */
+ public static final short AWAIT_REPLICA_REQUEST = 7;
+
+ /** Message type for {@link AwaitReplicaResponse}. */
+ public static final short AWAIT_REPLICA_RESPONSE = 8;
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index c33b163c27..4db24c1906 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -175,7 +175,6 @@ public class ItIgniteInMemoryNodeRestartTest extends
IgniteAbstractTest {
/**
* Restarts an in-memory node that is not a leader of the table's
partition.
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959")
@Test
public void inMemoryNodeRestartNotLeader(TestInfo testInfo) throws
Exception {
// Start three nodes, the first one is going to be CMG and MetaStorage
leader.
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a8d652cc4c..cdcfbf06db 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -829,7 +829,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/**
* Checks that the table created in cluster of 2 nodes, is recovered on a
node after restart of this node.
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959")
@Test
public void testRecoveryOnOneNode() {
Ignite ignite = startNode(0);
@@ -852,7 +851,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/**
* Checks that a cluster is able to restart when some changes were made in
configuration.
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959")
@Test
public void testRestartDiffConfig() {
List<IgniteImpl> ignites = startNodes(2);
@@ -882,7 +880,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/**
* The test for node restart when there is a gap between the node local
configuration and distributed configuration.
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959")
@Test
@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY,
value = "0")
public void testCfgGapWithoutData() {
@@ -912,7 +909,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
* group stops for some time while restarting node is being recovered. The
recovery process should continue and eventually succeed after
* metastorage group starts again.
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959")
@Test
@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY,
value = "0")
public void testMetastorageStop() {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
index 2be2b4fa1a..113f4e91b7 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -70,7 +69,6 @@ public class ItIndexSpoolTest extends
AbstractBasicIntegrationTest {
/**
* Test.
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17959")
@ParameterizedTest(name = "tableSize={0}, partitions={1}")
@MethodSource("rowsWithPartitionsArgs")
public void test(int rows, int partitions) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
new file mode 100644
index 0000000000..ff879ea012
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static
org.apache.ignite.distributed.ItTxDistributedTestSingleNode.NODE_PORT_BASE;
+import static
org.apache.ignite.distributed.ItTxDistributedTestSingleNode.startNode;
+import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.replicator.Replica;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Tests handling requests from {@link ReplicaService} to {@link
ReplicaManager} when the {@link Replica}
+ * is not started.
+ */
+public class ReplicaUnavailableTest extends IgniteAbstractTest {
+ private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
+ 1,
+ new Column[]{new Column("key", NativeTypes.INT64, false)},
+ new Column[]{new Column("value", NativeTypes.INT64, false)}
+ );
+
+ private final TableMessagesFactory tableMessagesFactory = new
TableMessagesFactory();
+
+ private final TestInfo testInfo;
+
+ private ReplicaService replicaService;
+
+ private ReplicaManager replicaManager;
+
+ private ClusterService clusterService;
+
+ private NetworkAddress networkAddress;
+
+ private String name = "client";
+
+ @BeforeEach
+ public void setup() {
+ networkAddress = new NetworkAddress(getLocalAddress(), NODE_PORT_BASE
+ 1);
+
+ var nodeFinder = new StaticNodeFinder(List.of(networkAddress));
+
+ clusterService = startNode(testInfo, name, NODE_PORT_BASE + 1,
nodeFinder);
+
+ HybridClock clock = mock(HybridClock.class);
+
+ replicaService = new ReplicaService(clusterService.messagingService(),
clock);
+
+ replicaManager = new ReplicaManager(clusterService,
+ clock,
+ Set.of(TableMessageGroup.class, TxMessageGroup.class));
+
+ replicaManager.start();
+ }
+
+ @AfterEach
+ public void teardown() {
+ clusterService.stop();
+ }
+
+ public ReplicaUnavailableTest(TestInfo testInfo) {
+ this.testInfo = testInfo;
+ }
+
+ @Test
+ public void testWithReplicaStartedAfterRequestSending() throws Exception {
+ ClusterNode clusterNode = new ClusterNode(name, name, networkAddress);
+
+ TablePartitionId tablePartitionId = new
TablePartitionId(UUID.randomUUID(), 1);
+
+ ReadWriteSingleRowReplicaRequest request =
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ .groupId(tablePartitionId)
+ .binaryRow(createKeyValueRow(1L, 1L))
+ .requestType(RequestType.RW_GET)
+ .build();
+
+
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
+ (message, sender, correlationId) -> {
+ try {
+ replicaManager.startReplica(tablePartitionId, request0
-> CompletableFuture.completedFuture(null));
+ } catch (NodeStoppingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+
+ replicaService.invoke(clusterNode, request).get(10, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testWithNotStartedReplica() {
+ ClusterNode clusterNode = new ClusterNode(name, name, networkAddress);
+
+ TablePartitionId tablePartitionId = new
TablePartitionId(UUID.randomUUID(), 1);
+
+ ReadWriteSingleRowReplicaRequest request =
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ .groupId(tablePartitionId)
+ .binaryRow(createKeyValueRow(1L, 1L))
+ .requestType(RequestType.RW_GET)
+ .build();
+
+ Exception e0 = null;
+ Exception e1 = null;
+
+ try {
+ replicaService.invoke(clusterNode, request).get(10,
TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e0 = e;
+ }
+
+ try {
+ replicaService.invoke(clusterNode, request).get(10,
TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e1 = e;
+ }
+
+ assertTrue(e0 != null);
+ assertTrue(e0.getCause() instanceof ReplicaUnavailableException,
e0.toString());
+
+ assertTrue(e1 != null);
+ assertTrue(e1.getCause() instanceof ReplicaUnavailableException,
e1.toString());
+ }
+
+ private static Row createKeyValueRow(long id, long value) {
+ RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
+
+ rowBuilder.appendLong(id);
+ rowBuilder.appendLong(value);
+
+ return new Row(SCHEMA, new ByteBufferRow(rowBuilder.toBytes()));
+ }
+}