This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9ad100c341 IGNITE-22977 Reduced latency for txn preparing phase.
9ad100c341 is described below
commit 9ad100c34183071024ececb790b939bfa27435e5
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Tue Aug 20 11:27:34 2024 +0300
IGNITE-22977 Reduced latency for txn preparing phase.
---
.../ignite/client/handler/FakePlacementDriver.java | 7 +
.../ignite/internal/hlc/HybridClockImpl.java | 4 +-
.../internal/lang/IgniteSystemProperties.java | 6 +
.../ignite/internal/util/FastTimestamps.java | 5 +-
.../ignite/internal/hlc/HybridClockTest.java | 21 +-
.../IndexNodeFinishedRwTransactionsChecker.java | 2 +-
.../ignite/internal/index/TestPlacementDriver.java | 6 +
.../checkpoint/CheckpointMetricsTrackerTest.java | 16 +-
.../replicator/utils/TestPlacementDriver.java | 6 +
.../placementdriver/LeasePlacementDriver.java | 10 +
.../placementdriver/TestPlacementDriver.java | 6 +
.../placementdriver/PlacementDriverManager.java | 5 +
.../placementdriver/leases/LeaseTracker.java | 11 +
.../ignite/internal/raft/RaftGroupServiceImpl.java | 48 +++-
.../ignite/raft/jraft/option/NodeOptions.java | 2 +-
modules/runner/build.gradle | 10 +
.../benchmark/AbstractMultiNodeBenchmark.java | 6 +-
.../internal/benchmark/UpsertKvBenchmark.java | 109 ++++++++
.../ignite/internal/table/RowIdGenerator.java | 36 +++
.../replicator/PartitionReplicaListener.java | 284 ++++++++++-----------
.../distributed/storage/InternalTableImpl.java | 40 ++-
.../wrappers/DelegatingPlacementDriver.java | 6 +
.../internal/tx/impl/ReadWriteTransactionImpl.java | 21 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 19 +-
24 files changed, 467 insertions(+), 219 deletions(-)
diff --git
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 8f1bdc5835..70a295668c 100644
---
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -100,6 +100,13 @@ public class FakePlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return awaitPrimaryReplica(replicationGroupId, timestamp, 0,
TimeUnit.MILLISECONDS);
}
+ @Override
+ public ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
+ TablePartitionId id = (TablePartitionId) replicationGroupId;
+
+ return primaryReplicas.get(id.partitionId());
+ }
+
@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
grpId) {
return nullCompletedFuture();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 843dcb7e18..cfb82efac7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.hlc;
import static java.lang.Math.max;
-import static java.time.Clock.systemUTC;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
@@ -29,6 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.FastTimestamps;
/**
* A Hybrid Logical Clock implementation.
@@ -61,7 +61,7 @@ public class HybridClockImpl implements HybridClock {
}
private static long currentTime() {
- return systemUTC().instant().toEpochMilli() << LOGICAL_TIME_BITS_SIZE;
+ return FastTimestamps.coarseCurrentTimeMillis() <<
LOGICAL_TIME_BITS_SIZE;
}
@Override
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteSystemProperties.java
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteSystemProperties.java
index 1ebfecb2c0..798ec2f377 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteSystemProperties.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteSystemProperties.java
@@ -73,6 +73,12 @@ public final class IgniteSystemProperties {
/** Name of the property controlling whether, when a thread assertion is
triggered, it should also be written to the log. */
public static final String THREAD_ASSERTIONS_LOG_BEFORE_THROWING =
"THREAD_ASSERTIONS_LOG_BEFORE_THROWING";
+ /** Skip replication in a benchmark. */
+ public static final String IGNITE_SKIP_REPLICATION_IN_BENCHMARK =
"IGNITE_SKIP_REPLICATION_IN_BENCHMARK";
+
+ /** Skip storage update in a benchmark. */
+ public static final String IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK =
"IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK";
+
/**
* Enforces singleton.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
index 6079d05da6..c95b683971 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
@@ -23,7 +23,8 @@ package org.apache.ignite.internal.util;
public class FastTimestamps {
private static volatile long coarseCurrentTimeMillis =
System.currentTimeMillis();
- private static final long UPDATE_FREQUENCY_MS = 10;
+ /** Note: don't change this value, because it's crucial for a timestamp
generation. */
+ private static final long UPDATE_FREQUENCY_MS = 1;
static {
startUpdater();
@@ -46,7 +47,7 @@ public class FastTimestamps {
};
updater.setDaemon(true);
- updater.setPriority(10);
+ updater.setPriority(Thread.MAX_PRIORITY);
updater.start();
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
index 5cb248e1c2..cb35d8a9f5 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
@@ -21,15 +21,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import java.time.Clock;
-import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.FastTimestamps;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -46,7 +43,7 @@ class HybridClockTest extends BaseIgniteAbstractTest {
/**
* Mock of a system clock.
*/
- private static MockedStatic<Clock> clockMock;
+ private static MockedStatic<FastTimestamps> clockMock;
@Mock
private ClockUpdateListener updateListener;
@@ -61,7 +58,7 @@ class HybridClockTest extends BaseIgniteAbstractTest {
*/
@Test
public void testNow() {
- clockMock = mockToEpochMilli(100);
+ clockMock = mockCurrentTimestamp(100);
HybridClock clock = new HybridClockImpl();
@@ -79,7 +76,7 @@ class HybridClockTest extends BaseIgniteAbstractTest {
*/
@Test
public void testTick() {
- clockMock = mockToEpochMilli(100);
+ clockMock = mockCurrentTimestamp(100);
HybridClock clock = new HybridClockImpl();
@@ -108,7 +105,7 @@ class HybridClockTest extends BaseIgniteAbstractTest {
private void assertTimestampEquals(long sysTime, HybridTimestamp expTs,
Supplier<HybridTimestamp> clo) {
closeClockMock();
- clockMock = mockToEpochMilli(sysTime);
+ clockMock = mockCurrentTimestamp(sysTime);
assertEquals(expTs, clo.get());
}
@@ -167,12 +164,10 @@ class HybridClockTest extends BaseIgniteAbstractTest {
verify(updateListener, never()).onUpdate(anyLong());
}
- private static MockedStatic<Clock> mockToEpochMilli(long expected) {
- Clock spyClock = spy(Clock.class);
- MockedStatic<Clock> clockMock = mockStatic(Clock.class);
+ private static MockedStatic<FastTimestamps> mockCurrentTimestamp(long
expected) {
+ MockedStatic<FastTimestamps> clockMock =
mockStatic(FastTimestamps.class);
- clockMock.when(Clock::systemUTC).thenReturn(spyClock);
- when(spyClock.instant()).thenReturn(Instant.ofEpochMilli(expected));
+
clockMock.when(FastTimestamps::coarseCurrentTimeMillis).thenReturn(expected);
return clockMock;
}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
index 301a0f0f48..f11732cc2f 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
@@ -187,7 +187,7 @@ public class IndexNodeFinishedRwTransactionsChecker
implements LocalRwTxCounter,
}
/**
- * Returns {@code true} iff the requested catalog version is active and
all RW transactions started on versions strictly before that
+ * Returns {@code true} if the requested catalog version is active and all
RW transactions started on versions strictly before that
* version have finished on the node.
*
* @param catalogVersion Catalog version of interest.
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index f0650073ca..5fadd2b1be 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -33,6 +33,7 @@ import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.jetbrains.annotations.Nullable;
/** Implementation for tests. */
class TestPlacementDriver extends AbstractEventProducer<PrimaryReplicaEvent,
PrimaryReplicaEventParameters> implements PlacementDriver {
@@ -53,6 +54,11 @@ class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
return primaryReplicaMetaFutureById.get(replicationGroupId);
}
+ @Override
+ public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
+ return primaryReplicaMetaFutureById.get(replicationGroupId).join();
+ }
+
@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
grpId) {
throw new UnsupportedOperationException();
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTrackerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTrackerTest.java
index 46849b4b30..b6f8fe614b 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTrackerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTrackerTest.java
@@ -50,7 +50,7 @@ public class CheckpointMetricsTrackerTest {
assertThat(tracker.checkpointStartTime(),
equalTo(checkpointStartTime));
- assertThat(tracker.totalDuration(), greaterThanOrEqualTo(10L));
+ assertThat(tracker.totalDuration(), greaterThanOrEqualTo(1L));
}
@Test
@@ -97,7 +97,7 @@ public class CheckpointMetricsTrackerTest {
tracker.onSplitAndSortCheckpointPagesEnd();
- assertThat(tracker.splitAndSortCheckpointPagesDuration(),
greaterThanOrEqualTo(10L));
+ assertThat(tracker.splitAndSortCheckpointPagesDuration(),
greaterThanOrEqualTo(1L));
}
@Test
@@ -114,7 +114,7 @@ public class CheckpointMetricsTrackerTest {
tracker.onCheckpointEnd();
- assertThat(tracker.fsyncDuration(), greaterThanOrEqualTo(10L));
+ assertThat(tracker.fsyncDuration(), greaterThanOrEqualTo(1L));
}
@Test
@@ -131,7 +131,7 @@ public class CheckpointMetricsTrackerTest {
tracker.onFsyncStart();
- assertThat(tracker.pagesWriteDuration(), greaterThanOrEqualTo(10L));
+ assertThat(tracker.pagesWriteDuration(), greaterThanOrEqualTo(1L));
}
@Test
@@ -148,7 +148,7 @@ public class CheckpointMetricsTrackerTest {
tracker.onMarkCheckpointBeginEnd();
- assertThat(tracker.onMarkCheckpointBeginDuration(),
greaterThanOrEqualTo(10L));
+ assertThat(tracker.onMarkCheckpointBeginDuration(),
greaterThanOrEqualTo(1L));
}
@Test
@@ -165,7 +165,7 @@ public class CheckpointMetricsTrackerTest {
long beforeWriteLockDuration = tracker.beforeWriteLockDuration();
- assertThat(beforeWriteLockDuration, greaterThanOrEqualTo(10L));
+ assertThat(beforeWriteLockDuration, greaterThanOrEqualTo(1L));
assertThat(tracker.writeLockWaitDuration(), lessThan(0L));
assertThat(tracker.writeLockHoldDuration(), equalTo(0L));
@@ -176,7 +176,7 @@ public class CheckpointMetricsTrackerTest {
long writeLockWaitDuration = tracker.writeLockWaitDuration();
assertThat(tracker.beforeWriteLockDuration(),
equalTo(beforeWriteLockDuration));
- assertThat(writeLockWaitDuration, greaterThanOrEqualTo(10L));
+ assertThat(writeLockWaitDuration, greaterThanOrEqualTo(1L));
assertThat(tracker.writeLockHoldDuration(), lessThan(0L));
waitForChangeCoarseCurrentTimeMillis();
@@ -185,7 +185,7 @@ public class CheckpointMetricsTrackerTest {
assertThat(tracker.beforeWriteLockDuration(),
equalTo(beforeWriteLockDuration));
assertThat(tracker.writeLockWaitDuration(),
equalTo(writeLockWaitDuration));
- assertThat(tracker.writeLockHoldDuration(), greaterThanOrEqualTo(10L));
+ assertThat(tracker.writeLockHoldDuration(), greaterThanOrEqualTo(1L));
}
private void waitForChangeCoarseCurrentTimeMillis() throws Exception {
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
index aa2c0bcc15..fa70085543 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
@@ -62,6 +62,12 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return getPrimaryReplicaMeta();
}
+ @Override
+ public ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId
replicationGroupId,
+ HybridTimestamp timestamp) {
+ return getPrimaryReplicaMeta().join();
+ }
+
@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
grpId) {
return nullCompletedFuture();
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
index ea83862d99..94864b1ac2 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeasePlacementDriver.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
/**
* Service that provides an ability to await and retrieve primary replicas for
replication groups.
@@ -70,6 +71,15 @@ public interface LeasePlacementDriver extends
EventProducer<PrimaryReplicaEvent,
*/
CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp);
+ /**
+ * Returns the current primary replica without waiting.
+ *
+ * @param replicationGroupId Replication group id.
+ * @param timestamp The timestamp.
+ * @return Metadata information or null if not available.
+ */
+ @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp);
+
/**
* Returns a future that completes when all expiration event {@link
PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED} listeners of previous
* primary complete.
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index d91e9499ff..143dcf9d78 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -32,6 +32,7 @@ import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -75,6 +76,11 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
return getReplicaMetaFuture();
}
+ @Override
+ public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
+ return getReplicaMetaFuture().join();
+ }
+
@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
grpId) {
return nullCompletedFuture();
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index e7bae39bd6..c13658a542 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -299,6 +299,11 @@ public class PlacementDriverManager implements
IgniteComponent {
return leaseTracker.getPrimaryReplica(replicationGroupId,
timestamp);
}
+ @Override
+ public ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
+ return
leaseTracker.getCurrentPrimaryReplica(replicationGroupId, timestamp);
+ }
+
@Override
public CompletableFuture<Void>
previousPrimaryExpired(ReplicationGroupId replicationGroupId) {
return leaseTracker.previousPrimaryExpired(replicationGroupId);
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index ea1be1d3df..c2de87ca8d 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -300,6 +300,17 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
});
}
+ @Override
+ public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
+ Lease lease = getLease(replicationGroupId);
+
+ if (lease.isAccepted() &&
clockService.after(lease.getExpirationTime(), timestamp)) {
+ return lease;
+ }
+
+ return null;
+ }
+
/**
* Helper method that checks whether tracker for given groupId is present
in {@code primaryReplicaWaiters} map, whether it's empty
* and removes it if it's true.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 13065a1c78..19c21a8379 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -18,9 +18,11 @@
package org.apache.ignite.internal.raft;
import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.ThreadLocalRandom.current;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
@@ -53,6 +55,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.lang.SafeTimeReorderException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -171,19 +174,40 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
ScheduledExecutorService executor,
Marshaller commandsMarshaller
) {
- var service = new RaftGroupServiceImpl(
- groupId,
- cluster,
- factory,
- configuration,
- membersConfiguration,
- null,
- executor,
- commandsMarshaller
- );
+ boolean inBenchmark =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK);
+
+ RaftGroupServiceImpl service;
+ if (inBenchmark) {
+ service = new RaftGroupServiceImpl(
+ groupId,
+ cluster,
+ factory,
+ configuration,
+ membersConfiguration,
+ null,
+ executor,
+ commandsMarshaller
+ ) {
+ @Override
+ public <R> CompletableFuture<R> run(Command cmd) {
+ return
cmd.getClass().getSimpleName().contains("UpdateCommand") ?
nullCompletedFuture() : super.run(cmd);
+ }
+ };
+ } else {
+ service = new RaftGroupServiceImpl(
+ groupId,
+ cluster,
+ factory,
+ configuration,
+ membersConfiguration,
+ null,
+ executor,
+ commandsMarshaller
+ );
+ }
if (!getLeader) {
- return CompletableFuture.completedFuture(service);
+ return completedFuture(service);
}
return service.refreshLeader().handle((unused, throwable) -> {
@@ -816,6 +840,6 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
return CompletableFuture.failedFuture(new
PeerUnavailableException(peer.consistentId()));
}
- return CompletableFuture.completedFuture(node);
+ return completedFuture(node);
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index e444800edb..8df09366d3 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -52,7 +52,7 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
private static final int DEFAULT_STRIPES = Utils.cpus();
/** This value is used by default to determine the count of stripes for
log manager. */
- private static final int DEFAULT_LOG_STRIPES_COUNT = 4;
+ private static final int DEFAULT_LOG_STRIPES_COUNT = Math.min(4,
DEFAULT_STRIPES);
// A follower would become a candidate if it doesn't receive any message
// from the leader in |election_timeout_ms| milliseconds
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 8ad1bc82d8..fd622e9930 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -243,6 +243,16 @@ tasks.register("runnerPlatformBenchmark", JavaExec) {
enableAssertions = true
}
+tasks.register("runUpsertBenchmark", JavaExec) {
+ mainClass = "org.apache.ignite.internal.benchmark.UpsertKvBenchmark"
+
+ jvmArgs += defaultJvmArgs
+
+ classpath = sourceSets.integrationTest.runtimeClasspath
+
+ enableAssertions = true
+}
+
jar {
manifest {
attributes(
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index b5cd6e1797..f4c746b673 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -78,7 +78,7 @@ public class AbstractMultiNodeBenchmark {
* Starts ignite node and creates table {@link #TABLE_NAME}.
*/
@Setup
- public final void nodeSetUp() throws Exception {
+ public void nodeSetUp() throws Exception {
System.setProperty("jraft.available_processors", "2");
startCluster();
@@ -191,6 +191,10 @@ public class AbstractMultiNodeBenchmark {
+ " \"netClusterNodes\": [ {} ]\n"
+ " }\n"
+ " },\n"
+ + " storage.profiles: {"
+ + " " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+ + " " + DEFAULT_STORAGE_PROFILE + ".size: 2073741824 "
// Avoid page replacement.
+ + " },\n"
+ " clientConnector: { port:{} },\n"
+ " rest.port: {},\n"
+ " raft.fsync = " + fsync
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
new file mode 100644
index 0000000000..c5d2d18fc6
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark for a single upsert operation via KV API with a possibility to
disable updates via RAFT and to storage.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+public class UpsertKvBenchmark extends AbstractMultiNodeBenchmark {
+ private final Tuple tuple = Tuple.create();
+
+ private int id = 0;
+
+ private static KeyValueView<Tuple, Tuple> kvView;
+
+ @Override
+ public void nodeSetUp() throws Exception {
+
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK,
"true");
+
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
"true");
+ super.nodeSetUp();
+ }
+
+ /**
+ * Initializes the tuple.
+ */
+ @Setup
+ public void setUp() {
+ kvView = clusterNode.tables().table(TABLE_NAME).keyValueView();
+ for (int i = 1; i < 11; i++) {
+ tuple.set("field" + i, FIELD_VAL);
+ }
+ }
+
+ @TearDown
+ public void tearDown() {
+ System.out.println("Inserted " + id + " tuples");
+ }
+
+ /**
+ * Benchmark for KV upsert via embedded client.
+ */
+ @Benchmark
+ public void upsert() {
+ kvView.put(null, Tuple.create().set("ycsb_key", id++), tuple);
+ }
+
+ /**
+ * Benchmark's entry point.
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" + UpsertKvBenchmark.class.getSimpleName() + ".*")
+ .build();
+
+ new Runner(opt).run();
+ }
+
+ @Override
+ protected int nodes() {
+ return 1;
+ }
+
+ @Override
+ protected int partitionCount() {
+ return 8;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RowIdGenerator.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RowIdGenerator.java
new file mode 100644
index 0000000000..94bea7cd97
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RowIdGenerator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.util.FastTimestamps;
+
+/**
+ * New row id allocator.
+ */
+public class RowIdGenerator {
+ /**
+ * Get next row id.
+ *
+ * @return Next row id.
+ */
+ public static UUID next() {
+ return new UUID(FastTimestamps.coarseCurrentTimeMillis(),
ThreadLocalRandom.current().nextLong());
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ccc49bf2f6..eb6ee1856b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -49,6 +49,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.findAny;
@@ -79,6 +80,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
@@ -93,6 +95,7 @@ import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.lang.IgniteTriFunction;
import org.apache.ignite.internal.lang.SafeTimeReorderException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -129,6 +132,7 @@ import
org.apache.ignite.internal.partition.replicator.network.replication.Reque
import
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
@@ -163,6 +167,7 @@ import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.table.RowIdGenerator;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
@@ -417,7 +422,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
this.replicationGroupId = new TablePartitionId(tableId, partId);
- schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
+ this.schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
prepareIndexBuilderTxRwOperationTracker();
}
@@ -484,7 +489,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
private CompletableFuture<?> processRequest(ReplicaRequest request,
@Nullable Boolean isPrimary, String senderId,
@Nullable Long leaseStartTime) {
- if (request instanceof SchemaVersionAwareReplicaRequest) {
+ boolean hasSchemaVersion = request instanceof
SchemaVersionAwareReplicaRequest;
+
+ if (hasSchemaVersion) {
assert ((SchemaVersionAwareReplicaRequest)
request).schemaVersion() > 0 : "No schema version passed?";
}
@@ -514,13 +521,38 @@ public class PartitionReplicaListener implements
ReplicaListener {
return processGetEstimatedSizeRequest();
}
- HybridTimestamp opTsIfDirectRo = (request instanceof
ReadOnlyDirectReplicaRequest) ? clockService.now() : null;
+ @Nullable HybridTimestamp opTs = getTxOpTimestamp(request);
+ @Nullable HybridTimestamp opTsIfDirectRo = (request instanceof
ReadOnlyDirectReplicaRequest) ? opTs : null;
+ @Nullable HybridTimestamp txTs = getTxStartTimestamp(request);
+ if (txTs == null) {
+ txTs = opTsIfDirectRo;
+ }
+
+ // Don't need to validate schema.
+ if (opTs == null) {
+ assert opTsIfDirectRo == null;
+ return processOperationRequestWithTxRwCounter(senderId, request,
isPrimary, null, leaseStartTime);
+ }
+
+ assert txTs != null && opTs.compareTo(txTs) >= 0 : "Invalid request
timestamps";
+
+ @Nullable HybridTimestamp finalTxTs = txTs;
+ Runnable validateClo = () -> {
+ schemaCompatValidator.failIfTableDoesNotExistAt(opTs, tableId());
+
+ if (hasSchemaVersion) {
+ SchemaVersionAwareReplicaRequest versionAwareRequest =
(SchemaVersionAwareReplicaRequest) request;
+
+ schemaCompatValidator.failIfRequestSchemaDiffersFromTxTs(
+ finalTxTs,
+ versionAwareRequest.schemaVersion(),
+ tableId()
+ );
+ }
+ };
- return validateTableExistence(request, opTsIfDirectRo)
- .thenCompose(unused -> validateSchemaMatch(request,
opTsIfDirectRo))
- .thenCompose(unused -> waitForSchemasBeforeReading(request,
opTsIfDirectRo))
- .thenCompose(unused ->
- processOperationRequestWithTxRwCounter(senderId,
request, isPrimary, opTsIfDirectRo, leaseStartTime));
+ return
schemaSyncService.waitForMetadataCompleteness(opTs).thenRun(validateClo).thenCompose(ignored
->
+ processOperationRequestWithTxRwCounter(senderId, request,
isPrimary, opTsIfDirectRo, leaseStartTime));
}
private CompletableFuture<Long> processGetEstimatedSizeRequest() {
@@ -592,7 +624,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
/**
- * Validates that the table exists at a timestamp corresponding to the
request operation.
+ * Returns the txn operation timestamp.
*
* <ul>
* <li>For a read/write in an RW transaction, it's 'now'</li>
@@ -600,88 +632,25 @@ public class PartitionReplicaListener implements
ReplicaListener {
* <li>For a direct read in an RO implicit transaction, it's the
timestamp chosen (as 'now') to process the request</li>
* </ul>
*
- * <p>For other requests, the validation is skipped.
+ * <p>For other requests, op timestamp is not applicable and the
validation is skipped.
*
- * @param request Replica request corresponding to the operation.
- * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null}
otherwise.
- * @return Future completed when the validation is finished.
+ * @param request The request.
+ * @return The timestamp or {@code null} if not a tx operation request.
*/
- private CompletableFuture<Void> validateTableExistence(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
+ private @Nullable HybridTimestamp getTxOpTimestamp(ReplicaRequest request)
{
HybridTimestamp opStartTs;
- if (request instanceof ScanCloseReplicaRequest) {
- // We don't need to validate close request for table existence.
- opStartTs = null;
- } else if (request instanceof ReadWriteReplicaRequest) {
+ if (request instanceof ReadWriteReplicaRequest) {
opStartTs = clockService.now();
} else if (request instanceof ReadOnlyReplicaRequest) {
opStartTs = ((ReadOnlyReplicaRequest) request).readTimestamp();
} else if (request instanceof ReadOnlyDirectReplicaRequest) {
- assert opTsIfDirectRo != null;
-
- opStartTs = opTsIfDirectRo;
+ opStartTs = clockService.now();
} else {
opStartTs = null;
}
- if (opStartTs == null) {
- return nullCompletedFuture();
- }
-
- return schemaSyncService.waitForMetadataCompleteness(opStartTs)
- .thenRun(() ->
schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId()));
- }
-
- /**
- * Makes sure that {@link
SchemaVersionAwareReplicaRequest#schemaVersion()} sent in a request matches
table schema version
- * corresponding to the operation.
- *
- * @param request Replica request corresponding to the operation.
- * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null}
otherwise.
- * @return Future completed when the validation is finished.
- */
- private CompletableFuture<Void> validateSchemaMatch(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
- if (!(request instanceof SchemaVersionAwareReplicaRequest)) {
- return nullCompletedFuture();
- }
-
- HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
- if (tsToWaitForSchema == null) {
- tsToWaitForSchema = opTsIfDirectRo;
- }
-
- if (tsToWaitForSchema == null) {
- return nullCompletedFuture();
- }
-
- HybridTimestamp finalTsToWaitForSchema = tsToWaitForSchema;
- return
schemaSyncService.waitForMetadataCompleteness(finalTsToWaitForSchema)
- .thenRun(() -> {
- SchemaVersionAwareReplicaRequest versionAwareRequest =
(SchemaVersionAwareReplicaRequest) request;
-
- schemaCompatValidator.failIfRequestSchemaDiffersFromTxTs(
- finalTsToWaitForSchema,
- versionAwareRequest.schemaVersion(),
- tableId()
- );
- });
- }
-
- /**
- * Makes sure that we have schemas corresponding to the moment of tx
start; this makes PK extraction safe WRT
- * {@link SchemaRegistry#schema(int)}.
- *
- * @param request Replica request corresponding to the operation.
- * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null}
otherwise.
- * @return Future completed when the validation is finished.
- */
- private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
- HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
- if (tsToWaitForSchema == null) {
- tsToWaitForSchema = opTsIfDirectRo;
- }
-
- return tsToWaitForSchema == null ? nullCompletedFuture() :
schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchema);
+ return opStartTs;
}
/**
@@ -1985,28 +1954,34 @@ public class PartitionReplicaListener implements
ReplicaListener {
assert pkLocker != null;
- return pkLocker.locksForLookupByKey(txId, pk)
- .thenCompose(ignored -> {
+ CompletableFuture<Void> lockFut = pkLocker.locksForLookupByKey(txId,
pk);
- boolean cursorClosureSetUp = false;
- Cursor<RowId> cursor = null;
+ Supplier<CompletableFuture<T>> sup = () -> {
+ boolean cursorClosureSetUp = false;
+ Cursor<RowId> cursor = null;
- try {
- cursor = getFromPkIndex(pk);
+ try {
+ cursor = getFromPkIndex(pk);
- Cursor<RowId> finalCursor = cursor;
- CompletableFuture<T> resolvingFuture =
continueResolvingByPk(cursor, txId, action)
- .whenComplete((res, ex) ->
finalCursor.close());
+ Cursor<RowId> finalCursor = cursor;
+ CompletableFuture<T> resolvingFuture =
continueResolvingByPk(cursor, txId, action)
+ .whenComplete((res, ex) -> finalCursor.close());
- cursorClosureSetUp = true;
+ cursorClosureSetUp = true;
- return resolvingFuture;
- } finally {
- if (!cursorClosureSetUp && cursor != null) {
- cursor.close();
- }
- }
- });
+ return resolvingFuture;
+ } finally {
+ if (!cursorClosureSetUp && cursor != null) {
+ cursor.close();
+ }
+ }
+ };
+
+ if (isCompletedSuccessfully(lockFut)) {
+ return sup.get();
+ } else {
+ return lockFut.thenCompose(ignored -> sup.get());
+ }
}
private <T> CompletableFuture<T> continueResolvingByPk(
@@ -2353,7 +2328,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
RowId lockedRow = pkReadLockFuts[i].join();
if (lockedRow == null &&
uniqueKeys.add(pks.get(i).byteBuffer())) {
- rowsToInsert.put(new RowId(partId(),
UUID.randomUUID()), row);
+ rowsToInsert.put(new RowId(partId(),
RowIdGenerator.next()), row);
result.add(new NullBinaryRow());
} else {
@@ -2457,7 +2432,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
boolean insert = rowId == null;
- RowId rowId0 = insert ? new RowId(partId(),
UUID.randomUUID()) : rowId;
+ RowId rowId0 = insert ? new RowId(partId(),
RowIdGenerator.next()) : rowId;
return insert
? takeLocksForInsert(searchRow, rowId0, txId)
@@ -2798,7 +2773,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return resultFuture.thenApply(res -> {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res;
- if (full && !updateCommandResult.isPrimaryReplicaMatch()) {
+ if (full && updateCommandResult != null &&
!updateCommandResult.isPrimaryReplicaMatch()) {
throw new PrimaryReplicaMissException(txId,
cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
}
@@ -2806,18 +2781,20 @@ public class PartitionReplicaListener implements
ReplicaListener {
// Try to avoid double write if an entry is already
replicated.
synchronized (safeTime) {
if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- // We don't need to take the partition snapshots
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
-
cmd.tablePartitionId().asTablePartitionId(),
- cmd.rowToUpdate(),
- false,
- null,
- cmd.safeTime(),
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
+ if
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
{
+ // We don't need to take the partition
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+
cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowToUpdate(),
+ false,
+ null,
+ cmd.safeTime(),
+ null,
+ indexIdsAtRwTxBeginTs(txId)
+ );
+ }
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
}
@@ -3071,7 +3048,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(new ReplicaResult(false, null));
}
- RowId rowId0 = new RowId(partId(), UUID.randomUUID());
+ RowId rowId0 = new RowId(partId(), RowIdGenerator.next());
return takeLocksForInsert(searchRow, rowId0, txId)
.thenCompose(rowIdLock ->
validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
@@ -3098,7 +3075,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
boolean insert = rowId == null;
- RowId rowId0 = insert ? new RowId(partId(),
UUID.randomUUID()) : rowId;
+ RowId rowId0 = insert ? new RowId(partId(),
RowIdGenerator.next()) : rowId;
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>
lockFut = insert
? takeLocksForInsert(searchRow, rowId0, txId)
@@ -3130,7 +3107,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
boolean insert = rowId == null;
- RowId rowId0 = insert ? new RowId(partId(),
UUID.randomUUID()) : rowId;
+ RowId rowId0 = insert ? new RowId(partId(),
RowIdGenerator.next()) : rowId;
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>
lockFut = insert
? takeLocksForInsert(searchRow, rowId0, txId)
@@ -3306,11 +3283,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
/**
- * Wait for the async cleanup of the provided row to finish.
+ * Wait for the async cleanup of the provided row to finish.
*
* @param rowId Row Ids of existing row that the transaction affects.
* @param result The value that the returned future will wrap.
- *
* @param <T> Type of the {@code result}.
*/
private <T> CompletableFuture<T> awaitCleanup(@Nullable RowId rowId, T
result) {
@@ -3323,7 +3299,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
*
* @param rowIds Row Ids of existing rows that the transaction affects.
* @param result The value that the returned future will wrap.
- *
* @param <T> Type of the {@code result}.
*/
private <T> CompletableFuture<T> awaitCleanup(Collection<RowId> rowIds, T
result) {
@@ -3588,42 +3563,49 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (request instanceof PrimaryReplicaRequest) {
Long enlistmentConsistencyToken = ((PrimaryReplicaRequest)
request).enlistmentConsistencyToken();
- return placementDriver.getPrimaryReplica(replicationGroupId, now)
- .thenCompose(primaryReplicaMeta -> {
- if (primaryReplicaMeta == null) {
- return failedFuture(
- new PrimaryReplicaMissException(
- localNode.name(),
- null,
- localNode.id(),
- null,
- enlistmentConsistencyToken,
- null,
- null
- )
- );
- }
+ Function<ReplicaMeta, IgniteBiTuple<Boolean, Long>> validateClo =
primaryReplicaMeta -> {
+ if (primaryReplicaMeta == null) {
+ throw new PrimaryReplicaMissException(
+ localNode.name(),
+ null,
+ localNode.id(),
+ null,
+ enlistmentConsistencyToken,
+ null,
+ null
+ );
+ }
- long currentEnlistmentConsistencyToken =
primaryReplicaMeta.getStartTime().longValue();
-
- if (enlistmentConsistencyToken !=
currentEnlistmentConsistencyToken
- ||
clockService.before(primaryReplicaMeta.getExpirationTime(), now)
- ||
!isLocalPeer(primaryReplicaMeta.getLeaseholderId())
- ) {
- return failedFuture(
- new PrimaryReplicaMissException(
- localNode.name(),
-
primaryReplicaMeta.getLeaseholder(),
- localNode.id(),
-
primaryReplicaMeta.getLeaseholderId(),
- enlistmentConsistencyToken,
- currentEnlistmentConsistencyToken,
- null)
- );
- }
+ long currentEnlistmentConsistencyToken =
primaryReplicaMeta.getStartTime().longValue();
+
+ if (enlistmentConsistencyToken !=
currentEnlistmentConsistencyToken
+ ||
clockService.before(primaryReplicaMeta.getExpirationTime(), now)
+ || !isLocalPeer(primaryReplicaMeta.getLeaseholderId())
+ ) {
+ throw new PrimaryReplicaMissException(
+ localNode.name(),
+ primaryReplicaMeta.getLeaseholder(),
+ localNode.id(),
+ primaryReplicaMeta.getLeaseholderId(),
+ enlistmentConsistencyToken,
+ currentEnlistmentConsistencyToken,
+ null);
+ }
- return completedFuture(new IgniteBiTuple<>(null,
primaryReplicaMeta.getStartTime().longValue()));
- });
+ return new IgniteBiTuple<>(null,
primaryReplicaMeta.getStartTime().longValue());
+ };
+
+ ReplicaMeta meta =
placementDriver.getCurrentPrimaryReplica(replicationGroupId, now);
+
+ if (meta != null) {
+ try {
+ return completedFuture(validateClo.apply(meta));
+ } catch (Exception e) {
+ return failedFuture(e);
+ }
+ }
+
+ return placementDriver.getPrimaryReplica(replicationGroupId,
now).thenApply(validateClo);
} else if (request instanceof ReadOnlyReplicaRequest || request
instanceof ReplicaSafeTimeSyncRequest) {
return placementDriver.getPrimaryReplica(replicationGroupId, now)
.thenApply(primaryReplica -> new IgniteBiTuple<>(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 296cf73133..39900f36e9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -1923,33 +1923,49 @@ public class InternalTableImpl implements InternalTable
{
* @return The enlist future (then will a leader become known).
*/
protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int
partId, InternalTransaction tx) {
+ HybridTimestamp now = clock.now();
+
TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
tx.assignCommitPartition(tablePartitionId);
- return partitionMeta(tablePartitionId).thenApply(meta -> {
+ ReplicaMeta meta =
placementDriver.getCurrentPrimaryReplica(tablePartitionId, now);
+
+ Function<ReplicaMeta, IgniteBiTuple<ClusterNode, Long>> enlistClo =
replicaMeta -> {
TablePartitionId partGroupId = new TablePartitionId(tableId,
partId);
- return tx.enlist(partGroupId, new IgniteBiTuple<>(
- getClusterNode(meta),
- enlistmentConsistencyToken(meta))
- );
- });
+ IgniteBiTuple<ClusterNode, Long> enlistState = new
IgniteBiTuple<>(getClusterNode(replicaMeta),
+ enlistmentConsistencyToken(replicaMeta));
+
+ tx.enlist(partGroupId, enlistState);
+
+ return enlistState;
+ };
+
+ if (meta != null) {
+ try {
+ return completedFuture(enlistClo.apply(meta));
+ } catch (IgniteException e) {
+ if (e.code() != REPLICA_UNAVAILABLE_ERR) {
+ return failedFuture(e);
+ }
+ }
+ }
+
+ return partitionMeta(tablePartitionId, now).thenApply(enlistClo);
}
@Override
public CompletableFuture<ClusterNode> partitionLocation(TablePartitionId
tablePartitionId) {
- return partitionMeta(tablePartitionId).thenApply(this::getClusterNode);
+ return partitionMeta(tablePartitionId,
clock.now()).thenApply(this::getClusterNode);
}
- private CompletableFuture<ReplicaMeta> partitionMeta(TablePartitionId
tablePartitionId) {
- HybridTimestamp now = clock.now();
-
- return awaitPrimaryReplica(tablePartitionId, now)
+ private CompletableFuture<ReplicaMeta> partitionMeta(TablePartitionId
tablePartitionId, HybridTimestamp at) {
+ return awaitPrimaryReplica(tablePartitionId, at)
.exceptionally(e -> {
throw withCause(
TransactionException::new,
REPLICA_UNAVAILABLE_ERR,
- "Failed to get the primary replica
[tablePartitionId=" + tablePartitionId + ", awaitTimestamp=" + now + ']',
+ "Failed to get the primary replica
[tablePartitionId=" + tablePartitionId + ", awaitTimestamp=" + at + ']',
e
);
});
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
index c5989ec45a..aba6e5430b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
/**
* A base for a {@link PlacementDriver} that delegates some of its methods to
another {@link PlacementDriver}.
@@ -60,6 +61,11 @@ abstract class DelegatingPlacementDriver implements
PlacementDriver {
return delegate.getPrimaryReplica(replicationGroupId, timestamp);
}
+ @Override
+ public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
+ return delegate.getCurrentPrimaryReplica(replicationGroupId,
timestamp);
+ }
+
@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
grpId) {
return delegate.previousPrimaryExpired(grpId);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index b2ea186b42..84eed4c7e6 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -101,9 +101,11 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
TablePartitionId tablePartitionId,
IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken
) {
- checkEnlistPossibility();
-
- enlistPartitionLock.readLock().lock();
+ // No need to wait for lock if commit is in progress.
+ if (!enlistPartitionLock.readLock().tryLock()) {
+ failEnlist();
+ assert false; // Not reachable.
+ }
try {
checkEnlistPossibility();
@@ -114,15 +116,22 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
}
}
+ /**
+ * Fails the operation.
+ */
+ private void failEnlist() {
+ throw new TransactionException(
+ TX_ALREADY_FINISHED_ERR,
+ format("Transaction is already finished [id={}, state={}].",
id(), state()));
+ }
+
/**
* Checks that this transaction was not finished and will be able to
enlist another partition.
*/
private void checkEnlistPossibility() {
if (finishFuture != null) {
// This means that the transaction is either in final or FINISHING
state.
- throw new TransactionException(
- TX_ALREADY_FINISHED_ERR,
- format("Transaction is already finished [id={},
state={}].", id(), state()));
+ failEnlist();
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 542952d0a6..03d3b3308a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -56,8 +56,8 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
@@ -165,14 +165,13 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
* Total number of started transaction.
* TODO: IGNITE-21440 Implement transaction metrics.
*/
- private final AtomicInteger startedTxs = new AtomicInteger();
+ private final LongAdder startedTxs = new LongAdder();
/**
* Total number of finished transaction.
* TODO: IGNITE-21440 Implement transaction metrics.
*/
- private final AtomicInteger finishedTxs = new AtomicInteger();
-
+ private final LongAdder finishedTxs = new LongAdder();
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -393,7 +392,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
HybridTimestamp beginTimestamp = readOnly ? clockService.now() :
createBeginTimestampWithIncrementRwTxCounter();
UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp,
priority);
- startedTxs.incrementAndGet();
+ startedTxs.add(1);
if (!readOnly) {
txStateVolatileStorage.initialize(txId, localNodeId);
@@ -463,7 +462,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
public void finishFull(HybridTimestampTracker timestampTracker, UUID txId,
boolean commit) {
TxState finalState;
- finishedTxs.incrementAndGet();
+ finishedTxs.add(1);
if (commit) {
timestampTracker.update(clockService.now());
@@ -498,7 +497,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
) {
LOG.debug("Finish [commit={}, txId={}, groups={}].", commitIntent,
txId, enlistedGroups);
- finishedTxs.incrementAndGet();
+ finishedTxs.add(1);
assert enlistedGroups != null;
@@ -751,12 +750,12 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
@Override
public int finished() {
- return finishedTxs.get();
+ return finishedTxs.intValue();
}
@Override
public int pending() {
- return startedTxs.get() - finishedTxs.get();
+ return startedTxs.intValue() - finishedTxs.intValue();
}
@Override
@@ -867,7 +866,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
}
CompletableFuture<Void> completeReadOnlyTransactionFuture(TxIdAndTimestamp
txIdAndTimestamp) {
- finishedTxs.incrementAndGet();
+ finishedTxs.add(1);
CompletableFuture<Void> readOnlyTxFuture =
readOnlyTxFutureById.remove(txIdAndTimestamp);