This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new fa717c6420 IGNITE-23255 Fastest way to use Placement driver for RO 
transaction (#4521)
fa717c6420 is described below

commit fa717c64204f84d9ff9f556088c333c5a28669de
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Tue Oct 8 15:11:30 2024 +0300

    IGNITE-23255 Fastest way to use Placement driver for RO transaction (#4521)
---
 .../placementdriver/leases/LeaseTracker.java       | 16 ++++
 .../replicator/PartitionReplicaListener.java       |  4 +-
 .../distributed/storage/InternalTableImpl.java     | 90 ++++++++++++----------
 3 files changed, 66 insertions(+), 44 deletions(-)

diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 3675e2dcd9..52758e5331 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -31,6 +31,7 @@ import static 
org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -46,7 +47,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -262,6 +265,19 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
             long timeout,
             TimeUnit unit
     ) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+        try {
+            ReplicaMeta currentMeta = getCurrentPrimaryReplica(groupId, 
timestamp);
+
+            if (currentMeta != null && 
clusterNodeResolver.getById(currentMeta.getLeaseholderId()) != null) {
+                return completedFuture(currentMeta);
+            }
+        } finally {
+            busyLock.leaveBusy();
+        }
+
         CompletableFuture<ReplicaMeta> future = new CompletableFuture<>();
 
         awaitPrimaryReplica(groupId, timestamp, future);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 1c457d8ada..733ad3edab 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -2751,11 +2751,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 return resultFuture.thenCompose(res -> {
                     UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res.getResult();
 
-                    if (!updateCommandResult.isPrimaryReplicaMatch()) {
+                    if (updateCommandResult != null && 
!updateCommandResult.isPrimaryReplicaMatch()) {
                         throw new PrimaryReplicaMissException(txId, 
cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
                     }
 
-                    if (updateCommandResult.isPrimaryInPeersAndLearners()) {
+                    if (updateCommandResult != null && 
updateCommandResult.isPrimaryInPeersAndLearners()) {
                         return safeTime.waitFor(((UpdateCommand) 
res.getCommand()).safeTime()).thenApply(ignored -> null);
                     } else {
                         if 
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
 {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 9e9f76ab90..7bd7442b1f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -756,26 +756,7 @@ public class InternalTableImpl implements InternalTable {
 
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
 
-        CompletableFuture<R> fut = awaitPrimaryReplica(tablePartitionId, 
tx.startTimestamp())
-                .thenCompose(primaryReplica -> {
-                    try {
-                        ClusterNode node = getClusterNode(primaryReplica);
-
-                        return replicaSvc.invoke(node, 
op.apply(tablePartitionId, enlistmentConsistencyToken(primaryReplica)));
-                    } catch (Throwable e) {
-                        throw new TransactionException(
-                                INTERNAL_ERR,
-                                format(
-                                        "Failed to invoke the replica request 
[tableName={}, partId={}].",
-                                        tableName,
-                                        partId
-                                ),
-                                e
-                        );
-                    }
-                });
-
-        return postEvaluate(fut, tx);
+        return sendReadOnlyToPrimaryReplica(tx, tablePartitionId, op);
     }
 
     /**
@@ -796,24 +777,51 @@ public class InternalTableImpl implements InternalTable {
 
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
 
-        CompletableFuture<R> fut = awaitPrimaryReplica(tablePartitionId, 
tx.startTimestamp())
-                .thenCompose(primaryReplica -> {
-                    try {
-                        ClusterNode node = getClusterNode(primaryReplica);
-
-                        return replicaSvc.invoke(node, 
op.apply(tablePartitionId, enlistmentConsistencyToken(primaryReplica)));
-                    } catch (Throwable e) {
-                        throw new TransactionException(
-                                INTERNAL_ERR,
-                                format(
-                                        "Failed to invoke the replica request 
[tableName={}, partId={}].",
-                                        tableName,
-                                        partId
-                                ),
-                                e
-                        );
-                    }
-                });
+        return sendReadOnlyToPrimaryReplica(tx, tablePartitionId, op);
+    }
+
+    /**
+     * Sends a read-only transaction request to the primary replica for a 
replication grout specified.
+     *
+     * @param tx Transaction.
+     * @param tablePartitionId Replication group id.
+     * @param op Replica requests factory.
+     * @param <R> The future.
+     * @return The future.
+     */
+    private <R> CompletableFuture<R> sendReadOnlyToPrimaryReplica(
+            InternalTransaction tx,
+            TablePartitionId tablePartitionId,
+            BiFunction<TablePartitionId, Long, ReplicaRequest> op
+    ) {
+        ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(tablePartitionId, tx.startTimestamp());
+
+        Function<ReplicaMeta, CompletableFuture<R>> evaluateClo = 
primaryReplica -> {
+            try {
+                ClusterNode node = getClusterNode(primaryReplica);
+
+                return replicaSvc.invoke(node, op.apply(tablePartitionId, 
enlistmentConsistencyToken(primaryReplica)));
+            } catch (Throwable e) {
+                throw new TransactionException(
+                        INTERNAL_ERR,
+                        format("Failed to invoke the replica request 
[tableName={}, grp={}].", tableName, tablePartitionId),
+                        e
+                );
+            }
+        };
+
+        CompletableFuture<R> fut;
+
+        if (meta != null && 
clusterNodeResolver.getById(meta.getLeaseholderId()) != null) {
+            try {
+                fut = evaluateClo.apply(meta);
+            } catch (IgniteException e) {
+                return failedFuture(e);
+            }
+        } else {
+            fut = awaitPrimaryReplica(tablePartitionId, tx.startTimestamp())
+                    .thenCompose(evaluateClo);
+        }
 
         return postEvaluate(fut, tx);
     }
@@ -1941,13 +1949,11 @@ public class InternalTableImpl implements InternalTable 
{
             return enlistState;
         };
 
-        if (meta != null) {
+        if (meta != null && 
clusterNodeResolver.getById(meta.getLeaseholderId()) != null) {
             try {
                 return completedFuture(enlistClo.apply(meta));
             } catch (IgniteException e) {
-                if (e.code() != REPLICA_UNAVAILABLE_ERR) {
-                    return failedFuture(e);
-                }
+                return failedFuture(e);
             }
         }
 

Reply via email to