This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7873c18a68 IGNITE-23303 Improve transaction performance.
7873c18a68 is described below
commit 7873c18a6820a1136b0ca4e6a32aa2134d3e7f02
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Wed Nov 20 19:00:24 2024 +0300
IGNITE-23303 Improve transaction performance.
---
.../handler/ClientPrimaryReplicaTrackerTest.java | 12 +-
.../apache/ignite/client/fakes/FakeTxManager.java | 9 +-
.../apache/ignite/internal/hlc/HybridClock.java | 4 +-
.../ignite/internal/hlc/HybridClockImpl.java | 61 ++--
.../internal/util/IgniteStripedReadWriteLock.java | 10 +
.../util/PendingComparableValuesTracker.java | 23 +-
.../ignite/internal/hlc/HybridClockTest.java | 41 +--
.../util/PendingComparableValuesTrackerTest.java | 4 +-
.../apache/ignite/internal/TestHybridClock.java | 116 +------
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 80 -----
.../apache/ignite/raft/jraft/core/NodeImpl.java | 19 --
.../apache/ignite/raft/jraft/core/Replicator.java | 6 -
...caResult.java => CommandApplicationResult.java} | 37 +--
.../ignite/internal/replicator/ReplicaManager.java | 42 +--
.../ignite/internal/replicator/ReplicaResult.java | 16 +-
.../ignite/internal/replicator/ReplicaService.java | 32 +-
.../benchmark/AbstractMultiNodeBenchmark.java | 2 +-
.../internal/benchmark/UpsertKvBenchmark.java | 13 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 8 +-
.../ItTxObservableTimePropagationTest.java | 89 +++++
.../ignite/internal/table/ItColocationTest.java | 43 ++-
.../table/distributed/raft/PartitionListener.java | 4 +-
.../replicator/PartitionReplicaListener.java | 119 ++++---
.../distributed/storage/InternalTableImpl.java | 34 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 17 +-
.../ignite/internal/table/TxAbstractTest.java | 297 +----------------
.../internal/table/TxInfrastructureTest.java | 363 +++++++++++++++++++++
.../table/impl/DummyInternalTableImpl.java | 43 ++-
.../org/apache/ignite/internal/tx/TxManager.java | 7 +-
.../internal/tx/impl/ReadWriteTransactionImpl.java | 6 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 13 +-
31 files changed, 805 insertions(+), 765 deletions(-)
diff --git
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
index 01e1f6b0b2..8f9ccef1bc 100644
---
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
+++
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java
@@ -89,10 +89,10 @@ class ClientPrimaryReplicaTrackerTest extends
BaseIgniteAbstractTest {
public void testUpdateByEvent() {
tracker.start();
- assertEquals(1, tracker.maxStartTime());
- driver.updateReplica("s3", TABLE_ID, 0, 2);
-
assertEquals(2, tracker.maxStartTime());
+ driver.updateReplica("s3", TABLE_ID, 0, 3);
+
+ assertEquals(3, tracker.maxStartTime());
PrimaryReplicasResult replicas =
tracker.primaryReplicasAsync(TABLE_ID, null).join();
assertEquals(PARTITIONS, replicas.nodeNames().size());
@@ -105,10 +105,10 @@ class ClientPrimaryReplicaTrackerTest extends
BaseIgniteAbstractTest {
driver.updateReplica(null, TABLE_ID, 0, 2);
tracker.start();
- assertEquals(1, tracker.maxStartTime());
- driver.updateReplica(null, TABLE_ID, 1, 2);
-
assertEquals(2, tracker.maxStartTime());
+ driver.updateReplica(null, TABLE_ID, 1, 3);
+
+ assertEquals(3, tracker.maxStartTime());
PrimaryReplicasResult replicas =
tracker.primaryReplicasAsync(TABLE_ID, null).join();
assertEquals(PARTITIONS, replicas.nodeNames().size());
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 7c08564e70..4d8c00d9cc 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -183,10 +183,6 @@ public class FakeTxManager implements TxManager {
return CompletableFuture.runAsync(runnable);
}
- @Override
- public void finishFull(HybridTimestampTracker timestampTracker, UUID txId,
boolean commit) {
- }
-
@Override
public CompletableFuture<Void> finish(
HybridTimestampTracker timestampTracker,
@@ -239,4 +235,9 @@ public class FakeTxManager implements TxManager {
public int pending() {
return 0;
}
+
+ @Override
+ public void finishFull(HybridTimestampTracker timestampTracker, UUID txId,
HybridTimestamp ts, boolean commit) {
+ // No-op.
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
index 6c3460d99e..d8a16f595f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
@@ -22,7 +22,7 @@ package org.apache.ignite.internal.hlc;
*/
public interface HybridClock {
/**
- * Creates a timestamp for new event.
+ * Creates a timestamp for new event. A timestamp is guarantied to be
unique and monotonically grown.
*
* @return The hybrid timestamp.
*/
@@ -37,7 +37,7 @@ public interface HybridClock {
long currentLong();
/**
- * Creates a timestamp for new event.
+ * Creates a timestamp for new event. A timestamp is guarantied to be
unique and monotonically grown.
*
* @return The hybrid timestamp.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 47e4d2b956..4960824454 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -21,10 +21,9 @@ import static java.lang.Math.max;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.tostring.S;
@@ -38,55 +37,42 @@ public class HybridClockImpl implements HybridClock {
/**
* Var handle for {@link #latestTime}.
*/
- private static final VarHandle LATEST_TIME;
-
- static {
- try {
- LATEST_TIME =
MethodHandles.lookup().findVarHandle(HybridClockImpl.class, "latestTime",
long.class);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
+ private static final AtomicLongFieldUpdater<HybridClockImpl> LATEST_TIME =
AtomicLongFieldUpdater.newUpdater(HybridClockImpl.class,
+ "latestTime");
private volatile long latestTime;
private final List<ClockUpdateListener> updateListeners = new
CopyOnWriteArrayList<>();
/**
- * The constructor which initializes the latest time to current time by
system clock.
- */
- public HybridClockImpl() {
- this.latestTime = currentTime();
- }
-
- /**
- * System current time in milliseconds shifting left to free insignificant
bytes.
- * This method is marked with a public modifier to mock in tests because
there is no way to mock currentTimeMillis.
+ * Returns current physical time in milliseconds.
*
- * @return Current time in milliseconds shifted right on two bytes.
+ * @return Current time.
*/
- public static long currentTime() {
- return System.currentTimeMillis() << LOGICAL_TIME_BITS_SIZE;
+ protected long physicalTime() {
+ return System.currentTimeMillis();
}
@Override
- public long nowLong() {
+ public final long nowLong() {
while (true) {
long now = currentTime();
// Read the latest time after accessing UTC time to reduce
contention.
long oldLatestTime = latestTime;
- long newLatestTime = max(oldLatestTime + 1, now);
+ if (oldLatestTime >= now) {
+ return LATEST_TIME.incrementAndGet(this);
+ }
- if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime))
{
- return newLatestTime;
+ if (LATEST_TIME.compareAndSet(this, oldLatestTime, now)) {
+ return now;
}
}
}
@Override
- public long currentLong() {
+ public final long currentLong() {
long current = currentTime();
return max(latestTime, current);
@@ -107,33 +93,34 @@ public class HybridClockImpl implements HybridClock {
}
@Override
- public HybridTimestamp now() {
+ public final HybridTimestamp now() {
return hybridTimestamp(nowLong());
}
@Override
- public HybridTimestamp current() {
+ public final HybridTimestamp current() {
return hybridTimestamp(currentLong());
}
/**
- * Updates the clock in accordance with an external event timestamp. If
the supplied timestamp is ahead of the
- * current clock timestamp, the clock gets adjusted to make sure it never
returns any timestamp before (or equal to)
- * the supplied external timestamp.
+ * Updates the clock in accordance with an external event timestamp. If
the supplied timestamp is ahead of the current clock timestamp,
+ * the clock gets adjusted to make sure it never returns any timestamp
before (or equal to) the supplied external timestamp.
*
* @param requestTime Timestamp from request.
* @return The resulting timestamp (guaranteed to exceed both previous
clock 'currentTs' and the supplied external ts).
*/
@Override
- public HybridTimestamp update(HybridTimestamp requestTime) {
+ public final HybridTimestamp update(HybridTimestamp requestTime) {
while (true) {
long now = currentTime();
// Read the latest time after accessing UTC time to reduce
contention.
- long oldLatestTime = this.latestTime;
+ long oldLatestTime = latestTime;
long newLatestTime = max(requestTime.longValue() + 1, max(now,
oldLatestTime + 1));
+ // TODO https://issues.apache.org/jira/browse/IGNITE-23707 avoid
CAS on logical part update.
+
if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime))
{
notifyUpdateListeners(newLatestTime);
@@ -142,6 +129,10 @@ public class HybridClockImpl implements HybridClock {
}
}
+ private long currentTime() {
+ return physicalTime() << LOGICAL_TIME_BITS_SIZE;
+ }
+
@Override
public void addUpdateListener(ClockUpdateListener listener) {
updateListeners.add(listener);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
index 796b557e1d..b799cc59f8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
@@ -30,6 +30,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* supports reentrancy semantics like {@link ReentrantReadWriteLock}.
*/
public class IgniteStripedReadWriteLock implements ReadWriteLock {
+ /** Default concurrency. */
+ private static final int CONCURRENCY = Math.max(1,
Runtime.getRuntime().availableProcessors() / 2);
+
/** Index generator. */
private static final AtomicInteger IDX_GEN = new AtomicInteger();
@@ -42,6 +45,13 @@ public class IgniteStripedReadWriteLock implements
ReadWriteLock {
/** Composite write lock. */
private final WriteLock writeLock;
+ /**
+ * Creates a new instance with default concurrency level.
+ */
+ public IgniteStripedReadWriteLock() {
+ this(CONCURRENCY);
+ }
+
/**
* Creates a new instance with given concurrency level.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
index 794641d3ce..4563e3b791 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
@@ -24,6 +24,7 @@ import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Comparator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -61,7 +62,7 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>, R> implemen
private volatile boolean closeGuard;
/** Busy lock to close synchronously. */
- private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+ private final IgniteStripedReadWriteLock busyLock = new
IgniteStripedReadWriteLock();
private final Comparator<Map.Entry<T, @Nullable R>> comparator;
@@ -86,7 +87,7 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>, R> implemen
*/
public void update(T newValue, @Nullable R futureResult) {
while (true) {
- if (!busyLock.enterBusy()) {
+ if (!busyLock.readLock().tryLock()) {
throw new TrackerClosedException();
}
@@ -104,7 +105,7 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>, R> implemen
break;
}
} finally {
- busyLock.leaveBusy();
+ busyLock.readLock().unlock();
}
}
}
@@ -118,18 +119,20 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>, R> implemen
* @param valueToWait Value to wait.
*/
public CompletableFuture<R> waitFor(T valueToWait) {
- if (!busyLock.enterBusy()) {
+ if (!busyLock.readLock().tryLock()) {
return failedFuture(new TrackerClosedException());
}
try {
- if (current.getKey().compareTo(valueToWait) >= 0) {
- return completedFuture(current.getValue());
+ Entry<T, @Nullable R> currentKeyValue = current;
+
+ if (currentKeyValue.getKey().compareTo(valueToWait) >= 0) {
+ return completedFuture(currentKeyValue.getValue());
}
return addNewWaiter(valueToWait);
} finally {
- busyLock.leaveBusy();
+ busyLock.readLock().unlock();
}
}
@@ -139,14 +142,14 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>, R> implemen
* @throws TrackerClosedException if the tracker is closed.
*/
public T current() {
- if (!busyLock.enterBusy()) {
+ if (!busyLock.readLock().tryLock()) {
throw new TrackerClosedException();
}
try {
return current.getKey();
} finally {
- busyLock.leaveBusy();
+ busyLock.readLock().unlock();
}
}
@@ -156,7 +159,7 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>, R> implemen
return;
}
- busyLock.block();
+ busyLock.writeLock().lock();
TrackerClosedException trackerClosedException = new
TrackerClosedException();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
index f4954882d0..f7d3b68e3a 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/HybridClockTest.java
@@ -17,21 +17,18 @@
package org.apache.ignite.internal.hlc;
-import static
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
-import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
/**
@@ -40,27 +37,19 @@ import org.mockito.junit.jupiter.MockitoExtension;
*/
@ExtendWith(MockitoExtension.class)
class HybridClockTest extends BaseIgniteAbstractTest {
- /**
- * Mock of a system clock.
- */
- private static MockedStatic<HybridClockImpl> clockMock;
-
@Mock
private ClockUpdateListener updateListener;
- @AfterEach
- public void afterEach() {
- closeClockMock();
- }
+ private long mockedTime;
/**
* Tests a {@link HybridClock#now()}.
*/
@Test
public void testNow() {
- clockMock = mockToEpochMilli(100);
+ mockedTime = 100;
- HybridClock clock = new HybridClockImpl();
+ HybridClock clock = new TestHybridClock(() -> mockedTime);
assertTimestampEquals(100, new HybridTimestamp(100, 1), clock::now);
@@ -76,9 +65,9 @@ class HybridClockTest extends BaseIgniteAbstractTest {
*/
@Test
public void testTick() {
- clockMock = mockToEpochMilli(100);
+ mockedTime = 100;
- HybridClock clock = new HybridClockImpl();
+ HybridClock clock = new TestHybridClock(() -> mockedTime);
assertTimestampEquals(100, new HybridTimestamp(100, 1),
() -> clock.update(new HybridTimestamp(50, 1)));
@@ -103,19 +92,11 @@ class HybridClockTest extends BaseIgniteAbstractTest {
}
private void assertTimestampEquals(long sysTime, HybridTimestamp expTs,
Supplier<HybridTimestamp> clo) {
- closeClockMock();
-
- clockMock = mockToEpochMilli(sysTime);
+ mockedTime = sysTime;
assertEquals(expTs, clo.get());
}
- private void closeClockMock() {
- if (clockMock != null && !clockMock.isClosed()) {
- clockMock.close();
- }
- }
-
@Test
void updateListenerIsNotNotifiedOnNowCall() {
HybridClock clock = new HybridClockImpl();
@@ -163,12 +144,4 @@ class HybridClockTest extends BaseIgniteAbstractTest {
verify(updateListener, never()).onUpdate(anyLong());
}
-
- private static MockedStatic<HybridClockImpl> mockToEpochMilli(long
expected) {
- MockedStatic<HybridClockImpl> clockMock =
mockStatic(HybridClockImpl.class);
-
- clockMock.when(HybridClockImpl::currentTime).thenReturn(expected <<
LOGICAL_TIME_BITS_SIZE);
-
- return clockMock;
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
index 2fcf257814..03530ef753 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
@@ -40,6 +40,7 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@@ -256,7 +257,8 @@ public class PendingComparableValuesTrackerTest {
CompletableFuture<Void> future0 = tracker.waitFor(2);
- tracker.close();
+ // Close is called from dedicated stop worker.
+ IgniteTestUtils.runAsync(tracker::close).join();
assertThrows(TrackerClosedException.class, tracker::current);
assertThrows(TrackerClosedException.class, () -> tracker.update(2,
null));
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
index eeab11539f..65c830280b 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
@@ -17,127 +17,23 @@
package org.apache.ignite.internal;
-import static java.lang.Math.max;
-import static
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.LongSupplier;
-import org.apache.ignite.internal.hlc.ClockUpdateListener;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
/**
- * Test hybrid clock with custom supplier of current time.
+ * Test hybrid clock with custom supplier of current time. TODO delete
*/
-public class TestHybridClock implements HybridClock {
+public class TestHybridClock extends HybridClockImpl {
/** Supplier of current time in milliseconds. */
private final LongSupplier currentTimeMillisSupplier;
- /** Latest time. */
- private volatile long latestTime;
-
- private final List<ClockUpdateListener> updateListeners = new
CopyOnWriteArrayList<>();
-
- /**
- * Var handle for {@link #latestTime}.
- */
- private static final VarHandle LATEST_TIME;
-
- static {
- try {
- LATEST_TIME =
MethodHandles.lookup().findVarHandle(TestHybridClock.class, "latestTime",
long.class);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
-
public TestHybridClock(LongSupplier currentTimeMillisSupplier) {
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
- this.latestTime = currentTime();
- }
-
- private long currentTime() {
- return currentTimeMillisSupplier.getAsLong() << LOGICAL_TIME_BITS_SIZE;
- }
-
- @Override
- public long nowLong() {
- while (true) {
- long now = currentTime();
-
- // Read the latest time after accessing UTC time to reduce
contention.
- long oldLatestTime = latestTime;
-
- long newLatestTime = max(oldLatestTime + 1, now);
-
- if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime))
{
- return newLatestTime;
- }
- }
- }
-
- @Override
- public long currentLong() {
- long current = currentTime();
-
- return max(latestTime, current);
- }
-
- private void notifyUpdateListeners(long newLatestTime) {
- updateListeners.forEach(listener -> listener.onUpdate(newLatestTime));
- }
-
- @Override
- public HybridTimestamp now() {
- return hybridTimestamp(nowLong());
- }
-
- @Override
- public HybridTimestamp current() {
- return hybridTimestamp(currentLong());
- }
-
- /**
- * Creates a timestamp for a received event.
- *
- * @param requestTime Timestamp from request.
- * @return The hybrid timestamp.
- */
- @Override
- public HybridTimestamp update(HybridTimestamp requestTime) {
- while (true) {
- long now = currentTime();
-
- // Read the latest time after accessing UTC time to reduce
contention.
- long oldLatestTime = this.latestTime;
-
- long newLatestTime = max(requestTime.longValue() + 1, max(now,
oldLatestTime + 1));
-
- if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime))
{
- notifyUpdateListeners(newLatestTime);
-
- return hybridTimestamp(newLatestTime);
- }
- }
- }
-
- @Override
- public void addUpdateListener(ClockUpdateListener listener) {
- updateListeners.add(listener);
- }
-
- @Override
- public void removeUpdateListener(ClockUpdateListener listener) {
- updateListeners.remove(listener);
+ now();
}
@Override
- public String toString() {
- return S.toString(HybridClock.class, this);
+ protected long physicalTime() {
+ return currentTimeMillisSupplier.getAsLong();
}
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 892e79603d..18d0512ba4 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -76,8 +76,6 @@ import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.IntStream;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
@@ -119,8 +117,6 @@ import org.apache.ignite.raft.jraft.option.BootstrapOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.option.ReadOnlyOption;
-import org.apache.ignite.raft.jraft.rpc.AppendEntriesRequestImpl;
-import org.apache.ignite.raft.jraft.rpc.AppendEntriesResponseImpl;
import org.apache.ignite.raft.jraft.rpc.RpcClientEx;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.RpcServer;
@@ -3815,82 +3811,6 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
assertTrue(res.get().isOk());
}
- /**
- * Tests propagation of HLC on heartbeat request and response.
- */
- @Test
- public void testHlcPropagation() throws Exception {
- List<TestPeer> peers = TestUtils.generatePeers(testInfo, 2);
-
- cluster = new TestCluster("unitest", dataPath, peers, 3_000, testInfo);
-
- for (TestPeer peer : peers) {
- RaftOptions opts = new RaftOptions();
- opts.setElectionHeartbeatFactor(4); // Election timeout divisor.
- HybridClock clock = new HybridClockImpl();
- assertTrue(cluster.start(peer, false, 300, false, null, opts,
clock));
- }
-
- List<NodeImpl> nodes = cluster.getNodes();
-
- for (NodeImpl node : nodes) {
- RpcClientEx rpcClientEx = sender(node);
- rpcClientEx.recordMessages((msg, nodeId) -> {
- if (msg instanceof AppendEntriesRequestImpl ||
- msg instanceof AppendEntriesResponseImpl) {
- return true;
- }
-
- return false;
-
- });
- }
-
- Node leader = cluster.waitAndGetLeader();
- cluster.ensureLeader(leader);
-
- RpcClientEx client = sender(leader);
-
- AtomicBoolean heartbeatRequest = new AtomicBoolean(false);
- AtomicBoolean appendEntriesRequest = new AtomicBoolean(false);
- AtomicBoolean heartbeatResponse = new AtomicBoolean(false);
- AtomicBoolean appendEntriesResponse = new AtomicBoolean(false);
-
- waitForCondition(() -> {
- client.recordedMessages().forEach(msgs -> {
- if (msgs[0] instanceof AppendEntriesRequestImpl) {
- AppendEntriesRequestImpl msg = (AppendEntriesRequestImpl)
msgs[0];
-
- if (msg.entriesList() == null && msg.data() == null) {
- heartbeatRequest.set(true);
- } else {
- appendEntriesRequest.set(true);
- }
-
- assertTrue(msg.timestamp() != null);
- } else if (msgs[0] instanceof AppendEntriesResponseImpl) {
- AppendEntriesResponseImpl msg =
(AppendEntriesResponseImpl) msgs[0];
- if (msg.timestamp() == null) {
- appendEntriesResponse.set(true);
- } else {
- heartbeatResponse.set(true);
- }
- }
- });
-
- return heartbeatRequest.get() &&
- appendEntriesRequest.get() &&
- heartbeatResponse.get() &&
- appendEntriesResponse.get();
- },
- 5000);
-
- assertTrue(heartbeatRequest.get());
- assertTrue(appendEntriesRequest.get());
- assertTrue(heartbeatResponse.get());
- assertTrue(appendEntriesResponse.get());
- }
-
@Test
public void
exLeaderDoesntBecomeLeaderIfExternallyEnforcedConfigDoesNotContainIt() throws
Exception {
final long configFromResetIndex = 2L;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index c187b8bcf4..1d7bc59143 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -2243,10 +2243,6 @@ public class NodeImpl implements Node, RaftServerService
{
.success(false)
.term(this.currTerm);
- if (request.timestamp() != null) {
- rb.timestamp(clock.update(request.timestamp()));
- }
-
return rb.build();
}
@@ -2264,10 +2260,6 @@ public class NodeImpl implements Node, RaftServerService
{
.success(false) //
.term(request.term() + 1);
- if (request.timestamp() != null) {
- rb.timestamp(clock.update(request.timestamp()));
- }
-
return rb.build();
}
@@ -2297,10 +2289,6 @@ public class NodeImpl implements Node, RaftServerService
{
.term(this.currTerm)
.lastLogIndex(lastLogIndex);
- if (request.timestamp() != null) {
- rb.timestamp(clock.update(request.timestamp()));
- }
-
return rb.build();
}
@@ -2311,9 +2299,6 @@ public class NodeImpl implements Node, RaftServerService {
.success(true)
.term(this.currTerm)
.lastLogIndex(this.logManager.getLastLogIndex());
- if (request.timestamp() != null) {
- respBuilder.timestamp(clock.update(request.timestamp()));
- }
doUnlock = false;
this.writeLock.unlock();
// see the comments at FollowerStableClosure#run()
@@ -2332,10 +2317,6 @@ public class NodeImpl implements Node, RaftServerService
{
.errorMsg(String.format("Node %s:%s log manager is
busy.", this.groupId, this.serverId))
.term(this.currTerm);
- if (request.timestamp() != null) {
- rb.timestamp(clock.update(request.timestamp()));
- }
-
return rb.build();
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 0028b06253..e088207357 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -743,7 +743,6 @@ public class Replicator implements ThreadId.OnError {
private void sendEmptyEntries(final boolean isHeartbeat,
final RpcResponseClosure<AppendEntriesResponse> heartBeatClosure) {
final AppendEntriesRequestBuilder rb =
raftOptions.getRaftMessagesFactory().appendEntriesRequest();
- rb.timestamp(options.getNode().clockNow());
if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) {
// id is unlock in installSnapshot
installSnapshot();
@@ -1163,9 +1162,6 @@ public class Replicator implements ThreadId.OnError {
if ((r = (Replicator) id.lock()) == null) {
return;
}
- if (response != null && response.timestamp() != null) {
- r.options.getNode().clockUpdate(response.timestamp());
- }
boolean doUnlock = true;
try {
final boolean isLogDebugEnabled = LOG.isDebugEnabled();
@@ -1671,8 +1667,6 @@ public class Replicator implements ThreadId.OnError {
RecycleUtil.recycle(byteBufList);
}
- rb.timestamp(this.options.getNode().clockNow());
-
final AppendEntriesRequest request = rb.build();
if (LOG.isDebugEnabled()) {
LOG.debug(
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/CommandApplicationResult.java
similarity index 58%
copy from
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
copy to
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/CommandApplicationResult.java
index 6a4d850c01..a5e253bf7b 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/CommandApplicationResult.java
@@ -18,44 +18,25 @@
package org.apache.ignite.internal.replicator;
import java.util.concurrent.CompletableFuture;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
/**
- * Represents replica execution result.
+ * Replication command application result.
*/
-public class ReplicaResult {
- /** The result. */
- private final Object res;
-
- /** The replication future. */
+public final class CommandApplicationResult {
+ private final HybridTimestamp commitTs;
private final CompletableFuture<?> repFut;
- /**
- * Construct a replica result.
- *
- * @param res The result.
- * @param repFut The replication future.
- */
- public ReplicaResult(@Nullable Object res, @Nullable CompletableFuture<?>
repFut) {
- this.res = res;
+ public CommandApplicationResult(HybridTimestamp commitTs,
CompletableFuture<?> repFut) {
+ this.commitTs = commitTs;
this.repFut = repFut;
}
- /**
- * Get the result.
- *
- * @return The result.
- */
- public @Nullable Object result() {
- return res;
+ public HybridTimestamp getCommitTimestamp() {
+ return commitTs;
}
- /**
- * Get the replication future.
- *
- * @return The replication future.
- */
- public @Nullable CompletableFuture<?> replicationFuture() {
+ public CompletableFuture<?> replicationFuture() {
return repFut;
}
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index b213e36f65..50904d3c13 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -123,6 +123,7 @@ import org.apache.ignite.internal.thread.ExecutorChooser;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteStripedReadWriteLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteException;
@@ -150,7 +151,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
private static final PlacementDriverMessagesFactory
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
/** Busy lock to stop synchronously. */
- private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+ private final IgniteStripedReadWriteLock busyLock = new
IgniteStripedReadWriteLock();
/** Prevents double stopping of the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
@@ -384,7 +385,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
private void handleReplicaRequest(ReplicaRequest request, ClusterNode
sender, @Nullable Long correlationId) {
- if (!busyLock.enterBusy()) {
+ if (!busyLock.readLock().tryLock()) {
if (LOG.isInfoEnabled()) {
LOG.info("Failed to process replica request (the node is
stopping) [request={}].", request);
}
@@ -452,7 +453,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
NetworkMessage msg;
if (ex == null) {
- msg = prepareReplicaResponse(sendTimestamp, res.result());
+ msg = prepareReplicaResponse(sendTimestamp, res);
} else {
if (indicatesUnexpectedProblem(ex)) {
LOG.warn("Failed to process replica request
[request={}].", ex, request);
@@ -471,14 +472,14 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
stopLeaseProlongation(groupId, null);
}
- if (ex == null && res.replicationFuture() != null) {
- res.replicationFuture().whenComplete((res0, ex0) -> {
+ if (ex == null && res.applyResult().replicationFuture() !=
null) {
+ res.applyResult().replicationFuture().whenComplete((res0,
ex0) -> {
NetworkMessage msg0;
LOG.debug("Sending delayed response for replica
request [request={}]", request);
if (ex0 == null) {
- msg0 = prepareReplicaResponse(sendTimestamp, res0);
+ msg0 = prepareReplicaResponse(sendTimestamp, new
ReplicaResult(res0, null));
} else {
LOG.warn("Failed to process delayed response
[request={}]", ex0, request);
@@ -491,7 +492,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
});
} finally {
- busyLock.leaveBusy();
+ busyLock.readLock().unlock();
}
}
@@ -524,7 +525,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
var msg = (PlacementDriverReplicaMessage) msg0;
- if (!busyLock.enterBusy()) {
+ if (!busyLock.readLock().tryLock()) {
if (LOG.isInfoEnabled()) {
LOG.info("Failed to process placement driver message (the node
is stopping) [msg={}].", msg);
}
@@ -545,7 +546,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
});
} finally {
- busyLock.leaveBusy();
+ busyLock.readLock().unlock();
}
}
@@ -658,7 +659,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
ReplicationGroupId replicaGrpId,
PeersAndLearners newConfiguration
) throws NodeStoppingException {
- if (!busyLock.enterBusy()) {
+ if (!busyLock.readLock().tryLock()) {
throw new NodeStoppingException();
}
@@ -682,7 +683,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
)
);
} finally {
- busyLock.leaveBusy();
+ busyLock.readLock().unlock();
}
}
@@ -835,14 +836,14 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* @throws NodeStoppingException If the node is stopping.
*/
public CompletableFuture<Boolean> stopReplica(ReplicationGroupId
replicaGrpId) throws NodeStoppingException {
- if (!busyLock.enterBusy()) {
+ if (!busyLock.readLock().tryLock()) {
throw new NodeStoppingException();
}
try {
return stopReplicaInternal(replicaGrpId);
} finally {
- busyLock.leaveBusy();
+ busyLock.readLock().unlock();
}
}
@@ -862,7 +863,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
LOG.error("Error when notifying about BEFORE_REPLICA_STOPPED
event.", e);
}
- if (!busyLock.enterBusy()) {
+ if (!busyLock.readLock().tryLock()) {
isRemovedFuture.completeExceptionally(new
NodeStoppingException());
return;
@@ -895,7 +896,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
return null;
});
} finally {
- busyLock.leaveBusy();
+ busyLock.readLock().unlock();
}
});
@@ -953,7 +954,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
return nullCompletedFuture();
}
- busyLock.block();
+ busyLock.writeLock().lock();
int shutdownTimeoutSeconds = 10;
@@ -1040,17 +1041,18 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
/**
* Prepares replica response.
*/
- private NetworkMessage prepareReplicaResponse(boolean sendTimestamp,
Object result) {
+ private NetworkMessage prepareReplicaResponse(boolean sendTimestamp,
ReplicaResult result) {
if (sendTimestamp) {
+ HybridTimestamp commitTs =
result.applyResult().getCommitTimestamp();
return REPLICA_MESSAGES_FACTORY
.timestampAwareReplicaResponse()
- .result(result)
- .timestamp(clockService.now())
+ .result(result.result())
+ .timestamp(commitTs == null ? clockService.current() :
commitTs)
.build();
} else {
return REPLICA_MESSAGES_FACTORY
.replicaResponse()
- .result(result)
+ .result(result.result())
.build();
}
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
index 6a4d850c01..3f9a8d8187 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaResult.java
@@ -17,28 +17,30 @@
package org.apache.ignite.internal.replicator;
-import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.Nullable;
/**
* Represents replica execution result.
*/
public class ReplicaResult {
+ /** Default replication outcome result. */
+ private static final CommandApplicationResult DEFAULT_RESULT = new
CommandApplicationResult(null, null);
+
/** The result. */
private final Object res;
/** The replication future. */
- private final CompletableFuture<?> repFut;
+ private final CommandApplicationResult commandApplicationResult;
/**
* Construct a replica result.
*
* @param res The result.
- * @param repFut The replication future.
+ * @param commandApplicationResult The replication result.
*/
- public ReplicaResult(@Nullable Object res, @Nullable CompletableFuture<?>
repFut) {
+ public ReplicaResult(@Nullable Object res, @Nullable
CommandApplicationResult commandApplicationResult) {
this.res = res;
- this.repFut = repFut;
+ this.commandApplicationResult = commandApplicationResult == null ?
DEFAULT_RESULT : commandApplicationResult;
}
/**
@@ -55,7 +57,7 @@ public class ReplicaResult {
*
* @return The replication future.
*/
- public @Nullable CompletableFuture<?> replicationFuture() {
- return repFut;
+ public CommandApplicationResult applyResult() {
+ return commandApplicationResult;
}
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 729f88f50d..0a2d2da796 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -118,18 +118,22 @@ public class ReplicaService {
this.retryExecutor = retryExecutor;
}
+ private <R> CompletableFuture<R> sendToReplica(String
targetNodeConsistentId, ReplicaRequest req) {
+ return (CompletableFuture<R>) sendToReplicaRaw(targetNodeConsistentId,
req).thenApply(res -> res.result());
+ }
+
/**
- * Sends request to the replica node.
+ * Sends request to the replica node and provides raw response.
*
- * @param targetNodeConsistentId A consistent id of the replica node..
+ * @param targetNodeConsistentId A consistent id of the replica node.
* @param req Replica request.
* @return Response future with either evaluation result or completed
exceptionally.
* @see NodeStoppingException If either supplier or demander node is
stopping.
* @see ReplicaUnavailableException If replica with given replication
group id doesn't exist or not started yet.
* @see ReplicationTimeoutException If the response could not be received
due to a timeout.
*/
- private <R> CompletableFuture<R> sendToReplica(String
targetNodeConsistentId, ReplicaRequest req) {
- CompletableFuture<R> res = new CompletableFuture<>();
+ private CompletableFuture<ReplicaResponse> sendToReplicaRaw(String
targetNodeConsistentId, ReplicaRequest req) {
+ CompletableFuture<ReplicaResponse> res = new CompletableFuture<>();
messagingService.invoke(
targetNodeConsistentId,
@@ -224,11 +228,11 @@ public class ReplicaService {
assert response0 instanceof
AwaitReplicaResponse :
"Incorrect response type [type=" +
response0.getClass().getSimpleName() + ']';
- sendToReplica(targetNodeConsistentId,
req).whenComplete((r, e) -> {
+ sendToReplicaRaw(targetNodeConsistentId,
req).whenComplete((r, e) -> {
if (e != null) {
res.completeExceptionally(e);
} else {
- res.complete((R) r);
+ res.complete(r);
}
});
}
@@ -249,7 +253,7 @@ public class ReplicaService {
}
}
} else {
- res.complete((R) ((ReplicaResponse) response).result());
+ res.complete((ReplicaResponse) response);
}
}
});
@@ -268,7 +272,7 @@ public class ReplicaService {
* @see ReplicationTimeoutException If the response could not be received
due to a timeout.
*/
public <R> CompletableFuture<R> invoke(ClusterNode node, ReplicaRequest
request) {
- return sendToReplica(node.name(), request);
+ return invokeRaw(node, request).thenApply(r -> (R) r.result());
}
/**
@@ -300,6 +304,18 @@ public class ReplicaService {
return sendToReplica(node.name(), request);
}
+ /**
+ * Sends a request to the given replica {@code node} and returns a future
that will be completed with a raw response.
+ * This can be used to carry additional metadata to the caller.
+ *
+ * @param node Cluster node.
+ * @param request The request.
+ * @return Response future with either evaluation raw response or
completed exceptionally.
+ */
+ public CompletableFuture<ReplicaResponse> invokeRaw(ClusterNode node,
ReplicaRequest request) {
+ return sendToReplicaRaw(node.name(), request);
+ }
+
public MessagingService messagingService() {
return messagingService;
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index cce6f4386f..cb85f9d8af 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -77,7 +77,7 @@ public class AbstractMultiNodeBenchmark {
@Nullable
protected String clusterConfiguration() {
- return null;
+ return "";
}
/**
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
index 507f4d8387..2e1ce6d074 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.benchmark;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
@@ -66,6 +66,13 @@ public class UpsertKvBenchmark extends
AbstractMultiNodeBenchmark {
@Param({"8"})
private int partitionCount;
+ private static final AtomicInteger counter = new AtomicInteger();
+
+ private static final ThreadLocal<Integer> gen = ThreadLocal.withInitial(()
-> {
+ int id = counter.getAndIncrement();
+ return id * 20_000_000;
+ });
+
@Override
public void nodeSetUp() throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK,
"true");
@@ -104,7 +111,9 @@ public class UpsertKvBenchmark extends
AbstractMultiNodeBenchmark {
}
private int nextId() {
- return ThreadLocalRandom.current().nextInt();
+ int cur = gen.get() + 1;
+ gen.set(cur);
+ return cur;
}
/**
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0a5b14c206..43b3f5a338 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -746,7 +746,7 @@ public class IgniteImpl implements Ignite {
SchemaSynchronizationConfiguration schemaSyncConfig =
clusterConfigRegistry
.getConfiguration(SchemaSynchronizationExtensionConfiguration.KEY).schemaSync();
- clockService = new ClockServiceImpl(clock, clockWaiter, new
SameValueLongSupplier(() -> schemaSyncConfig.maxClockSkew().value()));
+ clockService = new ClockServiceImpl(clock, clockWaiter, () ->
schemaSyncConfig.maxClockSkew().value());
idempotentCacheVacuumizer = new IdempotentCacheVacuumizer(
name,
@@ -1179,12 +1179,12 @@ public class IgniteImpl implements Ignite {
return Map.copyOf(decoratedEngines);
}
- private static SameValueLongSupplier
delayDurationMsSupplier(SchemaSynchronizationConfiguration schemaSyncConfig) {
- return new SameValueLongSupplier(() ->
schemaSyncConfig.delayDuration().value());
+ private static LongSupplier
delayDurationMsSupplier(SchemaSynchronizationConfiguration schemaSyncConfig) {
+ return () -> schemaSyncConfig.delayDuration().value();
}
private static LongSupplier
partitionIdleSafeTimePropagationPeriodMsSupplier(ReplicationConfiguration
replicationConfig) {
- return new SameValueLongSupplier(() ->
replicationConfig.idleSafeTimePropagationDuration().value());
+ return () ->
replicationConfig.idleSafeTimePropagationDuration().value();
}
private AuthenticationManager createAuthenticationManager() {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
new file mode 100644
index 0000000000..e84d6cb4e6
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxObservableTimePropagationTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static org.apache.ignite.distributed.ItTxTestCluster.NODE_PORT_BASE;
+import static
org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collection;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.TxInfrastructureTest;
+import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests if commit timestamp is propagated to observable time correctly.
+ */
+@ExtendWith(SystemPropertiesExtension.class)
+@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY,
value = "1000000")
+public class ItTxObservableTimePropagationTest extends TxInfrastructureTest {
+ private static final long CLIENT_FROZEN_PHYSICAL_TIME = 3000;
+
+ private static final int CLIENT_PORT = NODE_PORT_BASE - 1;
+
+ /**
+ * The constructor.
+ *
+ * @param testInfo Test info.
+ */
+ public ItTxObservableTimePropagationTest(TestInfo testInfo) {
+ super(testInfo);
+ }
+
+ @Override
+ protected int nodes() {
+ return 3;
+ }
+
+ @Override
+ protected int replicas() {
+ return 3;
+ }
+
+ @Override
+ protected HybridClock createClock(ClusterNode node) {
+ // Client physical time is frozen in the past, server time advances
normally.
+ return new TestHybridClock(() -> node.address().port() == CLIENT_PORT
? CLIENT_FROZEN_PHYSICAL_TIME : System.currentTimeMillis());
+ }
+
+ @Test
+ public void testImplicitObservableTimePropagation() {
+ RecordView<Tuple> view = accounts.recordView();
+ view.upsert(null, makeValue(1, 100.0));
+ TxManagerImpl clientTxManager = (TxManagerImpl)
txTestCluster.clientTxManager;
+ Collection<TxStateMeta> states = clientTxManager.states();
+ assertEquals(1, states.size());
+ HybridTimestamp commitTs = states.iterator().next().commitTimestamp();
+ assertNotNull(commitTs);
+ assertEquals(commitTs, timestampTracker.get());
+ assertTrue(commitTs.getPhysical() != CLIENT_FROZEN_PHYSICAL_TIME,
"Client time should be advanced to server time");
+ }
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index de0e2e1a4b..8a733b6aeb 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -57,13 +57,16 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.SingleClusterNodeResolver;
+import org.apache.ignite.internal.network.serialization.MessageSerializer;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
@@ -81,6 +84,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import
org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
+import
org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NullBinaryRow;
@@ -114,6 +118,7 @@ import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -121,6 +126,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.stubbing.Answer;
/**
* Tests for data colocation.
@@ -239,7 +245,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
groupRafts.put(new TablePartitionId(tblId, i), r);
}
- when(replicaService.invoke(any(ClusterNode.class),
any())).thenAnswer(invocation -> {
+ Answer<CompletableFuture<?>> clo = invocation -> {
ClusterNode node = invocation.getArgument(0);
ReplicaRequest request = invocation.getArgument(1);
var commitPartId = new TablePartitionId(2, 0);
@@ -278,7 +284,40 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
.txCoordinatorId(node.id())
.build());
}
- });
+ };
+ when(replicaService.invoke(any(ClusterNode.class),
any())).thenAnswer(clo);
+ when(replicaService.invokeRaw(any(ClusterNode.class),
any())).thenAnswer(
+ invocation -> clo.answer(invocation).thenApply(res -> new
TimestampAwareReplicaResponse() {
+ @Override
+ public @Nullable Object result() {
+ return res;
+ }
+
+ @Override
+ public @Nullable HybridTimestamp timestamp() {
+ return clock.now();
+ }
+
+ @Override
+ public MessageSerializer<NetworkMessage> serializer() {
+ return null;
+ }
+
+ @Override
+ public short messageType() {
+ return 0;
+ }
+
+ @Override
+ public short groupType() {
+ return 0;
+ }
+
+ @Override
+ public NetworkMessage clone() {
+ return null;
+ }
+ }));
intTable = new InternalTableImpl(
"PUBLIC.TEST",
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 3788f5475d..acf4dd65cf 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -588,7 +588,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
if (maxObservableSafeTime == -1) {
maxObservableSafeTime =
clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()).longValue();
- LOG.info("maxObservableSafeTime is initialized with [" +
maxObservableSafeTime + "].");
+ LOG.info("maxObservableSafeTime has been initialized with
[{}].", HybridTimestamp.hybridTimestamp(maxObservableSafeTime));
}
// Because of clock.tick it's guaranteed that two different
commands will have different safe timestamps.
@@ -607,7 +607,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
@Override
public void onLeaderStop() {
maxObservableSafeTime = -1;
- LOG.info("maxObservableSafeTime is set to [" + maxObservableSafeTime +
"] on leader stop.");
+ LOG.info("maxObservableSafeTime has been reset on leader stop.");
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 61bee26e96..0e63c388e5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -136,6 +136,7 @@ import
org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.CommandApplicationResult;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
@@ -642,11 +643,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
HybridTimestamp opStartTs;
if (request instanceof ReadWriteReplicaRequest) {
- opStartTs = clockService.now();
+ opStartTs = clockService.current();
} else if (request instanceof ReadOnlyReplicaRequest) {
opStartTs = ((ReadOnlyReplicaRequest) request).readTimestamp();
} else if (request instanceof ReadOnlyDirectReplicaRequest) {
- opStartTs = clockService.now();
+ opStartTs = clockService.current();
} else {
opStartTs = null;
}
@@ -1737,7 +1738,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
throw new TransactionException(commit ? TX_COMMIT_ERR
: TX_ROLLBACK_ERR, ex);
}
- TransactionResult result = (TransactionResult)
((ApplyCommandResult) txOutcome).result;
+ TransactionResult result = (TransactionResult)
((ResultWrapper) txOutcome).result;
markFinished(txId, result.transactionState(),
result.commitTimestamp());
@@ -1812,7 +1813,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
catalogVersion
);
- return new ReplicaResult(null,
commandReplicatedFuture);
+ return new ReplicaResult(null, new
CommandApplicationResult(null, commandReplicatedFuture));
});
} else {
return completedFuture(
@@ -2052,8 +2053,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (v instanceof ReplicaResult) {
ReplicaResult res = (ReplicaResult) v;
- if (res.replicationFuture() != null) {
- res.replicationFuture().whenComplete((v0, th0) -> {
+ if (res.applyResult().replicationFuture() != null) {
+
res.applyResult().replicationFuture().whenComplete((v0, th0) -> {
if (th0 != null) {
cleanupReadyFut.completeExceptionally(th0);
} else {
@@ -2619,7 +2620,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param cmd Raft command.
* @return Raft future or raft decorated future with command that was
processed.
*/
- private <T> CompletableFuture<T> applyCmdWithExceptionHandling(Command
cmd, CompletableFuture<T> resultFuture) {
+ private CompletableFuture<ResultWrapper<Object>>
applyCmdWithExceptionHandling(Command cmd) {
+ CompletableFuture<ResultWrapper<Object>> resultFuture = new
CompletableFuture<>();
+
applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture);
return resultFuture.exceptionally(throwable -> {
@@ -2674,7 +2677,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
resultFuture.completeExceptionally(ex);
}
} else {
- resultFuture.complete((T) new ApplyCommandResult<>(cmd, res));
+ resultFuture.complete((T) new ResultWrapper<>(cmd, res));
}
});
}
@@ -2692,7 +2695,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param catalogVersion Validated catalog version associated with given
operation.
* @return A local update ready future, possibly having a nested
replication future as a result for delayed ack purpose.
*/
- private CompletableFuture<CompletableFuture<?>> applyUpdateCommand(
+ private CompletableFuture<CommandApplicationResult> applyUpdateCommand(
TablePartitionId tablePartId,
UUID rowUuid,
@Nullable BinaryRow row,
@@ -2735,16 +2738,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
);
}
- CompletableFuture<UUID> fut =
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
- .thenApply(res -> cmd.txId());
+ CompletableFuture<UUID> repFut =
applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());
- return completedFuture(fut);
+ return completedFuture(new CommandApplicationResult(null,
repFut));
} else {
- CompletableFuture<ApplyCommandResult<Object>> resultFuture =
new CompletableFuture<>();
-
- applyCmdWithExceptionHandling(cmd, resultFuture);
-
- return resultFuture.thenCompose(res -> {
+ return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res.getResult();
if (updateCommandResult != null &&
!updateCommandResult.isPrimaryReplicaMatch()) {
@@ -2752,7 +2750,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
if (updateCommandResult != null &&
updateCommandResult.isPrimaryInPeersAndLearners()) {
- return safeTime.waitFor(((UpdateCommand)
res.getCommand()).safeTime()).thenApply(ignored -> null);
+ return safeTime.waitFor(((UpdateCommand)
res.getCommand()).safeTime())
+ .thenApply(ignored -> new
CommandApplicationResult(((UpdateCommand) res.getCommand()).safeTime(), null));
} else {
if (!SKIP_UPDATES) {
// We don't need to take the partition snapshots
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
@@ -2769,7 +2768,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
);
}
- return nullCompletedFuture();
+ // getCommand provides actual assigned safeTime (may
be reassigned due to reorder)
+ return completedFuture(new
CommandApplicationResult(((UpdateCommand) res.getCommand()).safeTime(), null));
}
});
}
@@ -2787,7 +2787,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param leaseStartTime Lease start time.
* @return A local update ready future, possibly having a nested
replication future as a result for delayed ack purpose.
*/
- private CompletableFuture<CompletableFuture<?>> applyUpdateCommand(
+ private CompletableFuture<CommandApplicationResult> applyUpdateCommand(
ReadWriteSingleRowReplicaRequest request,
UUID rowUuid,
@Nullable BinaryRow row,
@@ -2818,9 +2818,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param txCoordinatorId Transaction coordinator id.
* @param catalogVersion Validated catalog version associated with given
operation.
* @param skipDelayedAck {@code true} to disable the delayed ack
optimization.
- * @return Raft future, see {@link #applyCmdWithExceptionHandling(Command,
CompletableFuture)}.
+ * @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
*/
- private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
+ private CompletableFuture<CommandApplicationResult> applyUpdateAllCommand(
Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
TablePartitionIdMessage commitPartitionId,
UUID txId,
@@ -2857,7 +2857,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
indexIdsAtRwTxBeginTs(txId)
);
- return applyCmdWithExceptionHandling(cmd, new
CompletableFuture<>()).thenApply(res -> null);
+ return applyCmdWithExceptionHandling(cmd).thenApply(res ->
null);
} else {
// We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
@@ -2869,41 +2869,40 @@ public class PartitionReplicaListener implements
ReplicaListener {
null,
indexIdsAtRwTxBeginTs(txId)
);
+ }
- CompletableFuture<Object> fut =
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
- .thenApply(res -> cmd.txId());
+ CompletableFuture<UUID> repFut =
applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());
- return completedFuture(fut);
- }
+ return completedFuture(new CommandApplicationResult(null,
repFut));
} else {
- return applyCmdWithExceptionHandling(cmd, new
CompletableFuture<ApplyCommandResult<Object>>())
- .thenCompose(res -> {
- UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res.getResult();
-
- if (!updateCommandResult.isPrimaryReplicaMatch()) {
- throw new PrimaryReplicaMissException(
- cmd.txId(),
- cmd.leaseStartTime(),
-
updateCommandResult.currentLeaseStartTime()
- );
- }
- if
(updateCommandResult.isPrimaryInPeersAndLearners()) {
- return safeTime.waitFor(((UpdateAllCommand)
res.getCommand()).safeTime()).thenApply(ignored -> null);
- } else {
- // We don't need to take the partition
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
-
cmd.tablePartitionId().asTablePartitionId(),
- false,
- null,
- cmd.safeTime(),
- indexIdsAtRwTxBeginTs(txId)
- );
+ return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
+ UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res.getResult();
- return null;
- }
- });
+ if (!updateCommandResult.isPrimaryReplicaMatch()) {
+ throw new PrimaryReplicaMissException(
+ cmd.txId(),
+ cmd.leaseStartTime(),
+ updateCommandResult.currentLeaseStartTime()
+ );
+ }
+ if (updateCommandResult.isPrimaryInPeersAndLearners()) {
+ return safeTime.waitFor(((UpdateAllCommand)
res.getCommand()).safeTime())
+ .thenApply(ignored -> new
CommandApplicationResult(((UpdateAllCommand) res.getCommand()).safeTime(),
null));
+ } else {
+ // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ false,
+ null,
+ cmd.safeTime(),
+ indexIdsAtRwTxBeginTs(txId)
+ );
+
+ return completedFuture(new
CommandApplicationResult(((UpdateAllCommand) res.getCommand()).safeTime(),
null));
+ }
+ });
}
}
}
@@ -2917,7 +2916,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param leaseStartTime Lease start time.
* @return Raft future, see {@link #applyCmdWithExceptionHandling(Command,
CompletableFuture)}.
*/
- private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
+ private CompletableFuture<CommandApplicationResult> applyUpdateAllCommand(
ReadWriteMultiRowReplicaRequest request,
Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
int catalogVersion,
@@ -2981,7 +2980,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return takeLocksForDeleteExact(searchRow, rowId, row, txId)
.thenCompose(validatedRowId -> {
if (validatedRowId == null) {
- return completedFuture(new
ReplicaResult(false, request.full() ? null : nullCompletedFuture()));
+ return completedFuture(new
ReplicaResult(false, null));
}
return
validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
@@ -3771,7 +3770,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future that will complete with catalog version associated with
given operation though the operation timestamp.
*/
private CompletableFuture<Integer>
validateWriteAgainstSchemaAfterTakingLocks(UUID txId) {
- HybridTimestamp operationTimestamp = clockService.now();
+ HybridTimestamp operationTimestamp = clockService.current();
return reliableCatalogVersionFor(operationTimestamp)
.thenApply(catalogVersion -> {
@@ -3781,7 +3780,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
- private UpdateCommand updateCommand(
+ private static UpdateCommand updateCommand(
TablePartitionId tablePartId,
UUID rowUuid,
@Nullable BinaryRow row,
@@ -4150,11 +4149,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
* Wrapper for the update(All)Command processing result that besides
result itself stores actual command that was processed. It helps to
* manage commands substitutions on SafeTimeReorderException where cloned
command with adjusted safeTime is sent.
*/
- private static class ApplyCommandResult<T> {
+ private static class ResultWrapper<T> {
private final Command command;
private final T result;
- ApplyCommandResult(Command command, T result) {
+ ResultWrapper(Command command, T result) {
this.command = command;
this.result = result;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index fbe5a3d5de..6452594758 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -109,6 +109,7 @@ import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutExcepti
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
@@ -633,7 +634,19 @@ public class InternalTableImpl implements InternalTable {
|| request instanceof SwapRowReplicaRequest;
if (full) { // Full transaction retries are handled in postEnlist.
- return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(),
request);
+ return
replicaSvc.invokeRaw(primaryReplicaAndConsistencyToken.get1(),
request).handle((r, e) -> {
+ boolean hasError = e != null;
+ assert hasError || r instanceof TimestampAware;
+
+ // Timestamp is set to commit timestamp for full transactions.
+ tx.finish(!hasError, hasError ? null : ((TimestampAware)
r).timestamp(), true);
+
+ if (e != null) {
+ sneakyThrow(e);
+ }
+
+ return (R) r.result();
+ });
} else {
if (write) { // Track only write requests from explicit
transactions.
if (!transactionInflights.addInflight(tx.id(), false)) {
@@ -714,11 +727,7 @@ public class InternalTableImpl implements InternalTable {
assert !(autoCommit && full) : "Invalid combination of flags";
return fut.handle((BiFunction<T, Throwable, CompletableFuture<T>>) (r,
e) -> {
- if (full) { // Full txn is already finished remotely. Just update
local state.
- CompletableFuture<Void> finishFullTxFut = tx0.finish(e ==
null, null, true);
-
- assert finishFullTxFut.isDone() : "A full-state transaction
must finish synchronously.";
-
+ if (full) {
return e != null ? failedFuture(e) : completedFuture(r);
}
@@ -877,7 +886,7 @@ public class InternalTableImpl implements InternalTable {
}
if (tx.isReadOnly()) {
- return evaluateReadOnlyRecipientNode(partitionId(keyRow))
+ return evaluateReadOnlyRecipientNode(partitionId(keyRow),
tx.readTimestamp())
.thenCompose(recipientNode -> get(keyRow,
tx.readTimestamp(), recipientNode));
}
@@ -968,7 +977,7 @@ public class InternalTableImpl implements InternalTable {
if (tx != null && tx.isReadOnly()) {
BinaryRowEx firstRow = keyRows.iterator().next();
- return evaluateReadOnlyRecipientNode(partitionId(firstRow))
+ return evaluateReadOnlyRecipientNode(partitionId(firstRow),
tx.readTimestamp())
.thenCompose(recipientNode -> getAll(keyRows,
tx.readTimestamp(), recipientNode));
}
@@ -1107,7 +1116,7 @@ public class InternalTableImpl implements InternalTable {
.transactionId(txo.id())
.enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RW_UPSERT)
- .timestamp(clockService.now())
+ .timestamp(txo.startTimestamp()) // TODO
https://issues.apache.org/jira/browse/IGNITE-23712
.full(txo.implicit())
.coordinatorId(txo.coordinatorId())
.build(),
@@ -1947,7 +1956,7 @@ public class InternalTableImpl implements InternalTable {
* @return The enlist future (then will a leader become known).
*/
protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int
partId, InternalTransaction tx) {
- HybridTimestamp now = clockService.now();
+ HybridTimestamp now = tx.startTimestamp();
TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
tx.assignCommitPartition(tablePartitionId);
@@ -2063,12 +2072,13 @@ public class InternalTableImpl implements InternalTable
{
* Evaluated cluster node for read-only request processing.
*
* @param partId Partition id.
+ * @param readTimestamp Read timestamp.
* @return Cluster node to evalute read-only request.
*/
- protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId) {
+ protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId, @Nullable HybridTimestamp readTimestamp) {
TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
- return awaitPrimaryReplica(tablePartitionId, clockService.now())
+ return awaitPrimaryReplica(tablePartitionId, readTimestamp)
.handle((res, e) -> {
if (e != null) {
throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 23428c0d60..7f80bc15f3 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -173,7 +173,6 @@ import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.tx.IgniteTransactions;
@@ -210,8 +209,6 @@ public class ItTxTestCluster {
public static final int NODE_PORT_BASE = 20_000;
- private static final RaftMessagesFactory FACTORY = new
RaftMessagesFactory();
-
private ClusterService client;
private HybridClock clientClock;
@@ -403,7 +400,7 @@ public class ItTxTestCluster {
ClusterNode node = clusterService.topologyService().localMember();
- HybridClock clock = new HybridClockImpl();
+ HybridClock clock = createClock(node);
ClockWaiter clockWaiter = new ClockWaiter("test-node" + i, clock);
assertThat(clockWaiter.startAsync(new ComponentContext()),
willCompleteSuccessfully());
TestClockService clockService = new TestClockService(clock,
clockWaiter);
@@ -465,7 +462,7 @@ public class ItTxTestCluster {
Set.of(PartitionReplicationMessageGroup.class,
TxMessageGroup.class),
placementDriver,
partitionOperationsExecutor,
- () ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+ this::getSafeTimePropagationTimeout,
new NoOpFailureManager(),
commandMarshaller,
raftClientFactory,
@@ -549,6 +546,14 @@ public class ItTxTestCluster {
assertNotNull(clientTxManager);
}
+ protected HybridClock createClock(ClusterNode node) {
+ return new HybridClockImpl();
+ }
+
+ protected long getSafeTimePropagationTimeout() {
+ return DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
+ }
+
protected TxManagerImpl newTxManager(
ClusterService clusterService,
ReplicaService replicaSvc,
@@ -1027,7 +1032,7 @@ public class ItTxTestCluster {
assertTrue(waitForTopology(client, nodes + 1, 1000));
- clientClock = new HybridClockImpl();
+ clientClock = createClock(client.topologyService().localMember());
clientClockWaiter = new ClockWaiter("client-node", clientClock);
assertThat(clientClockWaiter.startAsync(new ComponentContext()),
willCompleteSuccessfully());
clientClockService = new TestClockService(clientClock,
clientClockWaiter);
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index b05c878d06..ea8bad1208 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -23,7 +23,6 @@ import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -36,7 +35,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
import java.util.ArrayList;
import java.util.Collection;
@@ -61,47 +59,25 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
-import org.apache.ignite.distributed.ItTxTestCluster;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.network.NodeFinder;
-import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaImpl;
import org.apache.ignite.internal.replicator.ReplicaManager;
-import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicaTestUtils;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
-import
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
-import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
-import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockException;
@@ -111,261 +87,46 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxPriority;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
-import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
-import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
-import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
-import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
/**
- * TODO asch IGNITE-15928 validate zero locks after test commit.
+ * Common tx test scenarios set. TODO asch IGNITE-15928 validate zero locks
after test commit.
*/
@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
-public abstract class TxAbstractTest extends IgniteAbstractTest {
- protected static final double BALANCE_1 = 500;
-
- protected static final double BALANCE_2 = 500;
-
- protected static final double DELTA = 100;
-
- protected static final String ACC_TABLE_NAME = "accounts";
-
- protected static final String CUST_TABLE_NAME = "customers";
-
- protected static SchemaDescriptor ACCOUNTS_SCHEMA = new SchemaDescriptor(
- 1,
- new Column[]{new Column("accountNumber".toUpperCase(),
NativeTypes.INT64, false)},
- new Column[]{new Column("balance".toUpperCase(),
NativeTypes.DOUBLE, false)}
- );
-
- protected static SchemaDescriptor CUSTOMERS_SCHEMA = new SchemaDescriptor(
- 1,
- new Column[]{new Column("accountNumber".toUpperCase(),
NativeTypes.INT64, false)},
- new Column[]{new Column("name".toUpperCase(), NativeTypes.STRING,
false)}
- );
-
- /** Accounts table id -> balance. */
- protected TableViewInternal accounts;
-
- /** Customers table id -> name. */
- protected TableViewInternal customers;
-
- protected HybridTimestampTracker timestampTracker = new
HybridTimestampTracker();
-
- protected IgniteTransactions igniteTransactions;
-
- // TODO fsync can be turned on again after
https://issues.apache.org/jira/browse/IGNITE-20195
- @InjectConfiguration("mock: { fsync: false }")
- protected RaftConfiguration raftConfiguration;
-
- @InjectConfiguration
- protected TransactionConfiguration txConfiguration;
-
- @InjectConfiguration
- protected StorageUpdateConfiguration storageUpdateConfiguration;
-
- @InjectConfiguration
- protected ReplicationConfiguration replicationConfiguration;
-
- protected final TestInfo testInfo;
-
- protected ItTxTestCluster txTestCluster;
-
- /**
- * Returns a count of nodes.
- *
- * @return Nodes.
- */
- protected abstract int nodes();
-
- /**
- * Returns a count of replicas.
- *
- * @return Replicas.
- */
- protected int replicas() {
- return 1;
- }
-
- /**
- * Returns {@code true} to disable collocation by using dedicated client
node.
- *
- * @return {@code true} to disable collocation.
- */
- protected boolean startClient() {
- return true;
- }
-
+public abstract class TxAbstractTest extends TxInfrastructureTest {
/**
* The constructor.
*
* @param testInfo Test info.
*/
public TxAbstractTest(TestInfo testInfo) {
- this.testInfo = testInfo;
- }
-
- /**
- * Initialize the test state.
- */
- @BeforeEach
- public void before() throws Exception {
- txTestCluster = new ItTxTestCluster(
- testInfo,
- raftConfiguration,
- txConfiguration,
- storageUpdateConfiguration,
- workDir,
- nodes(),
- replicas(),
- startClient(),
- timestampTracker,
- replicationConfiguration
- );
- txTestCluster.prepareCluster();
-
- this.igniteTransactions = txTestCluster.igniteTransactions();
-
- accounts = txTestCluster.startTable(ACC_TABLE_NAME, ACCOUNTS_SCHEMA);
- customers = txTestCluster.startTable(CUST_TABLE_NAME,
CUSTOMERS_SCHEMA);
-
- log.info("Tables have been started");
- }
-
- /**
- * Shutdowns all cluster nodes after each test.
- *
- * @throws Exception If failed.
- */
- @AfterEach
- public void after() throws Exception {
- txTestCluster.shutdownCluster();
- Mockito.framework().clearInlineMocks();
- }
-
- /**
- * Starts a node.
- *
- * @param name Node name.
- * @param port Local port.
- * @param nodeFinder Node finder.
- * @return The client cluster view.
- */
- public static ClusterService startNode(TestInfo testInfo, String name, int
port,
- NodeFinder nodeFinder) {
- var network = ClusterServiceTestUtils.clusterService(testInfo, port,
nodeFinder);
-
- assertThat(network.startAsync(new ComponentContext()),
willCompleteSuccessfully());
-
- return network;
- }
-
- /** {@inheritDoc} */
- protected TxManager clientTxManager() {
- return txTestCluster.clientTxManager();
- }
-
- /** {@inheritDoc} */
- protected TxManager txManager(TableViewInternal t) {
- String leaseHolder = primaryNode(t);
-
- assertNotNull(leaseHolder, "Table primary node should not be null");
-
- TxManager manager = txTestCluster.txManagers().get(leaseHolder);
-
- assertNotNull(manager);
-
- return manager;
- }
-
- private @Nullable String primaryNode(TableViewInternal t) {
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
txTestCluster.placementDriver().getPrimaryReplica(
- new TablePartitionId(t.tableId(), 0),
-
txTestCluster.clocks().get(txTestCluster.localNodeName()).now());
-
- assertThat(primaryReplicaFuture, willCompleteSuccessfully());
-
- return primaryReplicaFuture.join().getLeaseholder();
- }
-
- /**
- * Check the storage of partition is the same across all nodes. The
checking is based on {@link MvPartitionStorage#lastAppliedIndex()}
- * that is increased on all update storage operation.
- * TODO: IGNITE-18869 The method must be updated when a proper way to
compare storages will be implemented.
- *
- * @param table The table.
- * @param partId Partition id.
- * @return True if {@link MvPartitionStorage#lastAppliedIndex()} is
equivalent across all nodes, false otherwise.
- */
- protected boolean assertPartitionsSame(TableViewInternal table, int
partId) {
- long storageIdx = 0;
-
- for (Map.Entry<String, Loza> entry :
txTestCluster.raftServers().entrySet()) {
- Loza svc = entry.getValue();
-
- var server = (JraftServerImpl) svc.server();
-
- var groupId = new TablePartitionId(table.tableId(), partId);
-
- Peer serverPeer = server.localPeers(groupId).get(0);
-
- RaftGroupService grp = server.raftGroupService(new
RaftNodeId(groupId, serverPeer));
-
- var fsm = (JraftServerImpl.DelegatingStateMachine)
grp.getRaftNode().getOptions().getFsm();
-
- PartitionListener listener = (PartitionListener) fsm.getListener();
-
- MvPartitionStorage storage = listener.getMvStorage();
-
- if (storageIdx == 0) {
- storageIdx = storage.lastAppliedIndex();
- } else if (storageIdx != storage.lastAppliedIndex()) {
- return false;
- }
- }
-
- return true;
- }
-
- protected void injectFailureOnNextOperation(TableViewInternal accounts) {
- InternalTable internalTable = accounts.internalTable();
- ReplicaService replicaService =
IgniteTestUtils.getFieldValue(internalTable, "replicaSvc");
- Mockito.doReturn(CompletableFuture.failedFuture(new
Exception())).when(replicaService).invoke((String) any(), any());
- Mockito.doReturn(CompletableFuture.failedFuture(new
Exception())).when(replicaService).invoke((ClusterNode) any(), any());
- }
-
- protected Collection<TxManager> txManagers() {
- return txTestCluster.txManagers().values();
+ super(testInfo);
}
@Test
@@ -2056,58 +1817,6 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
assertEquals(total, total0, "Total amount invariant is not preserved");
}
- /**
- * Makes a key.
- *
- * @param id The id.
- * @return The key tuple.
- */
- protected Tuple makeKey(long id) {
- return Tuple.create().set("accountNumber", id);
- }
-
- /**
- * Makes a tuple containing key and value.
- *
- * @param id The id.
- * @param balance The balance.
- * @return The value tuple.
- */
- protected Tuple makeValue(long id, double balance) {
- return Tuple.create().set("accountNumber", id).set("balance", balance);
- }
-
- /**
- * Makes a tuple containing key and value.
- *
- * @param id The id.
- * @param name The name.
- * @return The value tuple.
- */
- private Tuple makeValue(long id, String name) {
- return Tuple.create().set("accountNumber", id).set("name", name);
- }
-
- /**
- * Makes a value.
- *
- * @param balance The balance.
- * @return The value tuple.
- */
- private Tuple makeValue(double balance) {
- return Tuple.create().set("balance", balance);
- }
-
- /**
- * Makes a value.
- *
- * @param name The name.
- * @return The value tuple.
- */
- private Tuple makeValue(String name) {
- return Tuple.create().set("name", name);
- }
-
/**
* Get a lock manager on a partition leader.
*
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
new file mode 100644
index 0000000000..5d7fafddb2
--- /dev/null
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxInfrastructureTest.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.distributed.ItTxTestCluster;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.NodeFinder;
+import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicatorConstants;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Setup infrastructure for tx related test scenarios.
+ */
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
+@MockitoSettings(strictness = Strictness.LENIENT)
+public abstract class TxInfrastructureTest extends IgniteAbstractTest {
+ protected static final double BALANCE_1 = 500;
+
+ protected static final double BALANCE_2 = 500;
+
+ protected static final double DELTA = 100;
+
+ protected static final String ACC_TABLE_NAME = "accounts";
+
+ protected static final String CUST_TABLE_NAME = "customers";
+
+ protected static SchemaDescriptor ACCOUNTS_SCHEMA = new SchemaDescriptor(
+ 1,
+ new Column[]{new Column("accountNumber".toUpperCase(),
NativeTypes.INT64, false)},
+ new Column[]{new Column("balance".toUpperCase(),
NativeTypes.DOUBLE, false)}
+ );
+
+ protected static SchemaDescriptor CUSTOMERS_SCHEMA = new SchemaDescriptor(
+ 1,
+ new Column[]{new Column("accountNumber".toUpperCase(),
NativeTypes.INT64, false)},
+ new Column[]{new Column("name".toUpperCase(), NativeTypes.STRING,
false)}
+ );
+
+ /** Accounts table id -> balance. */
+ protected TableViewInternal accounts;
+
+ /** Customers table id -> name. */
+ protected TableViewInternal customers;
+
+ protected HybridTimestampTracker timestampTracker = new
HybridTimestampTracker();
+
+ protected IgniteTransactions igniteTransactions;
+
+ // TODO fsync can be turned on again after
https://issues.apache.org/jira/browse/IGNITE-20195
+ @InjectConfiguration("mock: { fsync: false }")
+ protected RaftConfiguration raftConfiguration;
+
+ @InjectConfiguration
+ protected TransactionConfiguration txConfiguration;
+
+ @InjectConfiguration
+ protected StorageUpdateConfiguration storageUpdateConfiguration;
+
+ @InjectConfiguration
+ protected ReplicationConfiguration replicationConfiguration;
+
+ protected final TestInfo testInfo;
+
+ protected ItTxTestCluster txTestCluster;
+
+ /**
+ * Returns a count of nodes.
+ *
+ * @return Nodes.
+ */
+ protected abstract int nodes();
+
+ /**
+ * Returns a count of replicas.
+ *
+ * @return Replicas.
+ */
+ protected int replicas() {
+ return 1;
+ }
+
+ /**
+ * Returns {@code true} to disable collocation by using dedicated client
node.
+ *
+ * @return {@code true} to disable collocation.
+ */
+ protected boolean startClient() {
+ return true;
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param testInfo Test info.
+ */
+ public TxInfrastructureTest(TestInfo testInfo) {
+ this.testInfo = testInfo;
+ }
+
+ /**
+ * Initialize the test state.
+ */
+ @BeforeEach
+ public void before() throws Exception {
+ txTestCluster = new ItTxTestCluster(
+ testInfo,
+ raftConfiguration,
+ txConfiguration,
+ storageUpdateConfiguration,
+ workDir,
+ nodes(),
+ replicas(),
+ startClient(),
+ timestampTracker,
+ replicationConfiguration
+ ) {
+ @Override
+ protected HybridClock createClock(ClusterNode node) {
+ return TxInfrastructureTest.this.createClock(node);
+ }
+
+ @Override
+ protected long getSafeTimePropagationTimeout() {
+ return
TxInfrastructureTest.this.getSafeTimePropagationTimeout();
+ }
+ };
+ txTestCluster.prepareCluster();
+
+ this.igniteTransactions = txTestCluster.igniteTransactions();
+
+ accounts = txTestCluster.startTable(ACC_TABLE_NAME, ACCOUNTS_SCHEMA);
+ customers = txTestCluster.startTable(CUST_TABLE_NAME,
CUSTOMERS_SCHEMA);
+
+ log.info("Tables have been started");
+ }
+
+ protected HybridClock createClock(ClusterNode node) {
+ return new HybridClockImpl();
+ }
+
+ protected long getSafeTimePropagationTimeout() {
+ return
ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
+ }
+
+ /**
+ * Shutdowns all cluster nodes after each test.
+ *
+ * @throws Exception If failed.
+ */
+ @AfterEach
+ public void after() throws Exception {
+ txTestCluster.shutdownCluster();
+ Mockito.framework().clearInlineMocks();
+ }
+
+ /**
+ * Starts a node.
+ *
+ * @param name Node name.
+ * @param port Local port.
+ * @param nodeFinder Node finder.
+ * @return The client cluster view.
+ */
+ public static ClusterService startNode(TestInfo testInfo, String name, int
port,
+ NodeFinder nodeFinder) {
+ var network = ClusterServiceTestUtils.clusterService(testInfo, port,
nodeFinder);
+
+ assertThat(network.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ return network;
+ }
+
+ /** {@inheritDoc} */
+ protected TxManager clientTxManager() {
+ return txTestCluster.clientTxManager();
+ }
+
+ /** {@inheritDoc} */
+ protected TxManager txManager(TableViewInternal t) {
+ String leaseHolder = primaryNode(t);
+
+ assertNotNull(leaseHolder, "Table primary node should not be null");
+
+ TxManager manager = txTestCluster.txManagers().get(leaseHolder);
+
+ assertNotNull(manager);
+
+ return manager;
+ }
+
+ protected @Nullable String primaryNode(TableViewInternal t) {
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
txTestCluster.placementDriver().getPrimaryReplica(
+ new TablePartitionId(t.tableId(), 0),
+
txTestCluster.clocks().get(txTestCluster.localNodeName()).now());
+
+ assertThat(primaryReplicaFuture, willCompleteSuccessfully());
+
+ return primaryReplicaFuture.join().getLeaseholder();
+ }
+
+ /**
+ * Check the storage of partition is the same across all nodes. The
checking is based on {@link MvPartitionStorage#lastAppliedIndex()}
+ * that is increased on all update storage operation.
+ * TODO: IGNITE-18869 The method must be updated when a proper way to
compare storages will be implemented.
+ *
+ * @param table The table.
+ * @param partId Partition id.
+ * @return True if {@link MvPartitionStorage#lastAppliedIndex()} is
equivalent across all nodes, false otherwise.
+ */
+ protected boolean assertPartitionsSame(TableViewInternal table, int
partId) {
+ long storageIdx = 0;
+
+ for (Map.Entry<String, Loza> entry :
txTestCluster.raftServers().entrySet()) {
+ Loza svc = entry.getValue();
+
+ var server = (JraftServerImpl) svc.server();
+
+ var groupId = new TablePartitionId(table.tableId(), partId);
+
+ Peer serverPeer = server.localPeers(groupId).get(0);
+
+ RaftGroupService grp = server.raftGroupService(new
RaftNodeId(groupId, serverPeer));
+
+ var fsm = (JraftServerImpl.DelegatingStateMachine)
grp.getRaftNode().getOptions().getFsm();
+
+ PartitionListener listener = (PartitionListener) fsm.getListener();
+
+ MvPartitionStorage storage = listener.getMvStorage();
+
+ if (storageIdx == 0) {
+ storageIdx = storage.lastAppliedIndex();
+ } else if (storageIdx != storage.lastAppliedIndex()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ protected void injectFailureOnNextOperation(TableViewInternal accounts) {
+ InternalTable internalTable = accounts.internalTable();
+ ReplicaService replicaService =
IgniteTestUtils.getFieldValue(internalTable, "replicaSvc");
+ Mockito.doReturn(CompletableFuture.failedFuture(new
Exception())).when(replicaService).invoke((String) any(), any());
+ Mockito.doReturn(CompletableFuture.failedFuture(new
Exception())).when(replicaService).invoke((ClusterNode) any(), any());
+ }
+
+ protected Collection<TxManager> txManagers() {
+ return txTestCluster.txManagers().values();
+ }
+
+ /**
+ * Makes a key.
+ *
+ * @param id The id.
+ * @return The key tuple.
+ */
+ protected Tuple makeKey(long id) {
+ return Tuple.create().set("accountNumber", id);
+ }
+
+ /**
+ * Makes a tuple containing key and value.
+ *
+ * @param id The id.
+ * @param balance The balance.
+ * @return The value tuple.
+ */
+ protected Tuple makeValue(long id, double balance) {
+ return Tuple.create().set("accountNumber", id).set("balance", balance);
+ }
+
+ /**
+ * Makes a tuple containing key and value.
+ *
+ * @param id The id.
+ * @param name The name.
+ * @return The value tuple.
+ */
+ protected Tuple makeValue(long id, String name) {
+ return Tuple.create().set("accountNumber", id).set("name", name);
+ }
+
+ /**
+ * Makes a value.
+ *
+ * @param balance The balance.
+ * @return The value tuple.
+ */
+ protected Tuple makeValue(double balance) {
+ return Tuple.create().set("balance", balance);
+ }
+
+ /**
+ * Makes a value.
+ *
+ * @param name The name.
+ * @return The value tuple.
+ */
+ protected Tuple makeValue(String name) {
+ return Tuple.create().set("name", name);
+ }
+}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 5eaa8c927c..25d0318ce7 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.SingleClusterNodeResolver;
import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.network.serialization.MessageSerializer;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
@@ -77,6 +78,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import
org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -292,6 +294,45 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
return
replicaListener.invoke(invocationOnMock.getArgument(1),
nodeId).thenApply(ReplicaResult::result);
})
.when(replicaSvc).invoke(anyString(), any());
+
+ lenient()
+ .doAnswer(invocationOnMock -> {
+ ClusterNode node = invocationOnMock.getArgument(0);
+
+ return
replicaListener.invoke(invocationOnMock.getArgument(1), node.id())
+ .thenApply(r -> new
TimestampAwareReplicaResponse() {
+ @Override
+ public @Nullable Object result() {
+ return r.result();
+ }
+
+ @Override
+ public @Nullable HybridTimestamp
timestamp() {
+ return CLOCK.now();
+ }
+
+ @Override
+ public MessageSerializer<NetworkMessage>
serializer() {
+ return null;
+ }
+
+ @Override
+ public short messageType() {
+ return 0;
+ }
+
+ @Override
+ public short groupType() {
+ return 0;
+ }
+
+ @Override
+ public NetworkMessage clone() {
+ return null;
+ }
+ });
+ })
+ .when(replicaSvc).invokeRaw(any(ClusterNode.class), any());
}
AtomicLong raftIndex = new AtomicLong(1);
@@ -544,7 +585,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
/** {@inheritDoc} */
@Override
- public CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId) {
+ public CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId, @Nullable HybridTimestamp readTimestamp) {
return completedFuture(LOCAL_NODE);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index e9fd707342..c9b0ead47a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -116,17 +116,16 @@ public interface TxManager extends IgniteComponent {
* Finishes a one-phase committed transaction. This method doesn't contain
any distributed communication.
*
* @param timestampTracker Observable timestamp tracker. This tracker is
used to track an observable timestamp and should be
- * updated with commit timestamp of every committed transaction.
+ * updated with commit timestamp of every committed transaction.
Not null on commit.
* @param txId Transaction id.
* @param commit {@code True} if a commit requested.
*/
- void finishFull(HybridTimestampTracker timestampTracker, UUID txId,
boolean commit);
+ void finishFull(HybridTimestampTracker timestampTracker, UUID txId,
@Nullable HybridTimestamp ts, boolean commit);
/**
* Finishes a dependant transactions.
*
- * @param timestampTracker Observable timestamp tracker is used to track a
timestamp for either read-write or read-only
- * transaction execution. The tracker is also used to determine
the read timestamp for read-only transactions. Each client
+ * @param timestampTracker Observable timestamp tracker is used to
determine the read timestamp for read-only transactions. Each client
* should pass its own tracker to provide linearizability between
read-write and read-only transactions started by this client.
* @param commitPartition Partition to store a transaction state.
* @param commit {@code true} if a commit requested.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 1bc88d1bdc..3949196f61 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
/**
* The read-write implementation of an internal transaction.
@@ -157,17 +158,16 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
}
@Override
- public CompletableFuture<Void> finish(boolean commit, HybridTimestamp
executionTimestamp, boolean full) {
+ public CompletableFuture<Void> finish(boolean commit, @Nullable
HybridTimestamp executionTimestamp, boolean full) {
if (finishFuture != null) {
return finishFuture;
}
-
enlistPartitionLock.writeLock().lock();
try {
if (finishFuture == null) {
if (full) {
- txManager.finishFull(observableTsTracker, id(), commit);
+ txManager.finishFull(observableTsTracker, id(),
executionTimestamp, commit);
finishFuture = nullCompletedFuture();
} else {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index fdd139022e..281b3f6ae0 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -433,7 +433,12 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
@Override
public @Nullable TxStateMeta stateMeta(UUID txId) {
- return inBusyLock(busyLock, () -> txStateVolatileStorage.state(txId));
+ return txStateVolatileStorage.state(txId);
+ }
+
+ @TestOnly
+ public Collection<TxStateMeta> states() {
+ return txStateVolatileStorage.states();
}
@Override
@@ -442,13 +447,13 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
}
@Override
- public void finishFull(HybridTimestampTracker timestampTracker, UUID txId,
boolean commit) {
+ public void finishFull(HybridTimestampTracker timestampTracker, UUID txId,
@Nullable HybridTimestamp ts, boolean commit) {
TxState finalState;
finishedTxs.add(1);
if (commit) {
- timestampTracker.update(clockService.current());
+ timestampTracker.update(ts);
finalState = COMMITTED;
} else {
@@ -460,7 +465,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
finalState,
old == null ? null : old.txCoordinatorId(),
old == null ? null : old.commitPartitionId(),
- old == null ? null : old.commitTimestamp()
+ ts
));
decrementRwTxCount(txId);