This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fa717c6420 IGNITE-23255 Fastest way to use Placement driver for RO
transaction (#4521)
fa717c6420 is described below
commit fa717c64204f84d9ff9f556088c333c5a28669de
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Tue Oct 8 15:11:30 2024 +0300
IGNITE-23255 Fastest way to use Placement driver for RO transaction (#4521)
---
.../placementdriver/leases/LeaseTracker.java | 16 ++++
.../replicator/PartitionReplicaListener.java | 4 +-
.../distributed/storage/InternalTableImpl.java | 90 ++++++++++++----------
3 files changed, 66 insertions(+), 44 deletions(-)
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 3675e2dcd9..52758e5331 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -31,6 +31,7 @@ import static
org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -46,7 +47,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
@@ -262,6 +265,19 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
long timeout,
TimeUnit unit
) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+ try {
+ ReplicaMeta currentMeta = getCurrentPrimaryReplica(groupId,
timestamp);
+
+ if (currentMeta != null &&
clusterNodeResolver.getById(currentMeta.getLeaseholderId()) != null) {
+ return completedFuture(currentMeta);
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+
CompletableFuture<ReplicaMeta> future = new CompletableFuture<>();
awaitPrimaryReplica(groupId, timestamp, future);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 1c457d8ada..733ad3edab 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -2751,11 +2751,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
return resultFuture.thenCompose(res -> {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res.getResult();
- if (!updateCommandResult.isPrimaryReplicaMatch()) {
+ if (updateCommandResult != null &&
!updateCommandResult.isPrimaryReplicaMatch()) {
throw new PrimaryReplicaMissException(txId,
cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
}
- if (updateCommandResult.isPrimaryInPeersAndLearners()) {
+ if (updateCommandResult != null &&
updateCommandResult.isPrimaryInPeersAndLearners()) {
return safeTime.waitFor(((UpdateCommand)
res.getCommand()).safeTime()).thenApply(ignored -> null);
} else {
if
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
{
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 9e9f76ab90..7bd7442b1f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -756,26 +756,7 @@ public class InternalTableImpl implements InternalTable {
TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
- CompletableFuture<R> fut = awaitPrimaryReplica(tablePartitionId,
tx.startTimestamp())
- .thenCompose(primaryReplica -> {
- try {
- ClusterNode node = getClusterNode(primaryReplica);
-
- return replicaSvc.invoke(node,
op.apply(tablePartitionId, enlistmentConsistencyToken(primaryReplica)));
- } catch (Throwable e) {
- throw new TransactionException(
- INTERNAL_ERR,
- format(
- "Failed to invoke the replica request
[tableName={}, partId={}].",
- tableName,
- partId
- ),
- e
- );
- }
- });
-
- return postEvaluate(fut, tx);
+ return sendReadOnlyToPrimaryReplica(tx, tablePartitionId, op);
}
/**
@@ -796,24 +777,51 @@ public class InternalTableImpl implements InternalTable {
TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
- CompletableFuture<R> fut = awaitPrimaryReplica(tablePartitionId,
tx.startTimestamp())
- .thenCompose(primaryReplica -> {
- try {
- ClusterNode node = getClusterNode(primaryReplica);
-
- return replicaSvc.invoke(node,
op.apply(tablePartitionId, enlistmentConsistencyToken(primaryReplica)));
- } catch (Throwable e) {
- throw new TransactionException(
- INTERNAL_ERR,
- format(
- "Failed to invoke the replica request
[tableName={}, partId={}].",
- tableName,
- partId
- ),
- e
- );
- }
- });
+ return sendReadOnlyToPrimaryReplica(tx, tablePartitionId, op);
+ }
+
+ /**
+ * Sends a read-only transaction request to the primary replica for a
replication grout specified.
+ *
+ * @param tx Transaction.
+ * @param tablePartitionId Replication group id.
+ * @param op Replica requests factory.
+ * @param <R> The future.
+ * @return The future.
+ */
+ private <R> CompletableFuture<R> sendReadOnlyToPrimaryReplica(
+ InternalTransaction tx,
+ TablePartitionId tablePartitionId,
+ BiFunction<TablePartitionId, Long, ReplicaRequest> op
+ ) {
+ ReplicaMeta meta =
placementDriver.getCurrentPrimaryReplica(tablePartitionId, tx.startTimestamp());
+
+ Function<ReplicaMeta, CompletableFuture<R>> evaluateClo =
primaryReplica -> {
+ try {
+ ClusterNode node = getClusterNode(primaryReplica);
+
+ return replicaSvc.invoke(node, op.apply(tablePartitionId,
enlistmentConsistencyToken(primaryReplica)));
+ } catch (Throwable e) {
+ throw new TransactionException(
+ INTERNAL_ERR,
+ format("Failed to invoke the replica request
[tableName={}, grp={}].", tableName, tablePartitionId),
+ e
+ );
+ }
+ };
+
+ CompletableFuture<R> fut;
+
+ if (meta != null &&
clusterNodeResolver.getById(meta.getLeaseholderId()) != null) {
+ try {
+ fut = evaluateClo.apply(meta);
+ } catch (IgniteException e) {
+ return failedFuture(e);
+ }
+ } else {
+ fut = awaitPrimaryReplica(tablePartitionId, tx.startTimestamp())
+ .thenCompose(evaluateClo);
+ }
return postEvaluate(fut, tx);
}
@@ -1941,13 +1949,11 @@ public class InternalTableImpl implements InternalTable
{
return enlistState;
};
- if (meta != null) {
+ if (meta != null &&
clusterNodeResolver.getById(meta.getLeaseholderId()) != null) {
try {
return completedFuture(enlistClo.apply(meta));
} catch (IgniteException e) {
- if (e.code() != REPLICA_UNAVAILABLE_ERR) {
- return failedFuture(e);
- }
+ return failedFuture(e);
}
}