This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f9bfb52907 IGNITE-19570 Write intent resolution for RW transactions
(#2475)
f9bfb52907 is described below
commit f9bfb52907ca7074c67661e6de7183645add1341
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Aug 24 11:34:54 2023 +0300
IGNITE-19570 Write intent resolution for RW transactions (#2475)
---
.../ItRaftCommandLeftInLogUntilRestartTest.java | 49 +++
.../table/distributed/raft/PartitionListener.java | 2 +-
.../replicator/PartitionReplicaListener.java | 343 ++++++++++-----------
3 files changed, 208 insertions(+), 186 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 8d1d9594d1..e222628537 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.runner.app;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -32,6 +34,8 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
@@ -183,6 +187,10 @@ public class ItRaftCommandLeftInLogUntilRestartTest
extends ClusterPerClassInteg
assertTrue(IgniteTestUtils.waitForCondition(() ->
appliedIndexNode0.get() == appliedIndexNode1.get(), 10_000));
+ RaftGroupService raftGroupService =
table.internalTable().partitionRaftGroupService(0);
+
+ raftGroupService.peers().forEach(peer ->
assertThat(raftGroupService.snapshot(peer), willCompleteSuccessfully()));
+
leaderAndGroupRef.set(new IgniteBiTuple<>(leader, table.tableId()
+ "_part_0"));
afterBlock.accept(tx);
@@ -193,6 +201,9 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends
ClusterPerClassInteg
}
stopNodes();
+
+ log.info("Restart the cluster");
+
startCluster();
var node0Started = (IgniteImpl) CLUSTER_NODES.get(0);
@@ -294,6 +305,44 @@ public class ItRaftCommandLeftInLogUntilRestartTest
extends ClusterPerClassInteg
new RuntimeException(IgniteStringFormatter.format("Cannot
check a row {}", row), e);
}
}
+
+ transferLeadershipToLocalNode(ignite);
+
+ for (Object[] row : dataSet) {
+ try {
+ Tuple txTuple = table.keyValueView().get(null,
Tuple.create().set("ID", row[0]));
+
+ assertNotNull(txTuple);
+
+ assertEquals(row[1], txTuple.value("NAME"));
+ assertEquals(row[2], txTuple.value("SALARY"));
+ } catch (Exception e) {
+ new RuntimeException(IgniteStringFormatter.format("Cannot
check a row {} when the local node leader", row), e);
+ }
+ }
+ }
+
+ /**
+ * Transfers the leader to the local node related to the Ignite instance.
+ *
+ * @param ignite Ignite instance.
+ */
+ private static void transferLeadershipToLocalNode(IgniteImpl ignite) {
+ TableImpl table = (TableImpl)
ignite.tables().table(DEFAULT_TABLE_NAME);
+
+ RaftGroupService raftGroupService =
table.internalTable().partitionRaftGroupService(0);
+
+ List<Peer> peers = raftGroupService.peers();
+ assertNotNull(peers);
+
+ Peer leader = raftGroupService.leader();
+ assertNotNull(leader);
+
+ Peer localPeer = peers.stream().filter(peer ->
peer.consistentId().equals(ignite.name())).findFirst().get();
+
+ log.info("Leader is transferring [from={}, to={}]", leader, localPeer);
+
+ assertThat(raftGroupService.transferLeadership(localPeer),
willCompleteSuccessfully());
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index d2cdede161..9d98c69831 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -123,7 +123,7 @@ public class PartitionListener implements RaftGroupListener
{
this.safeTime = safeTime;
this.storageIndexTracker = storageIndexTracker;
- // TODO: IGNITE-18502 Implement a pending update storage
+ // TODO: IGNITE-18502 Excessive full partition scan on node start
try (PartitionTimestampCursor cursor =
partitionDataStorage.scan(HybridTimestamp.MAX_VALUE)) {
while (cursor.hasNext()) {
ReadResult readResult = cursor.next();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 3d805731ed..69ab2d9cfb 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -135,6 +135,7 @@ import
org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -463,24 +464,26 @@ public class PartitionReplicaListener implements
ReplicaListener {
return safeReadFuture.thenCompose(unused ->
scanSortedIndex(request, indexStorage));
}
- return safeReadFuture.thenCompose(unused ->
retrieveExactEntriesUntilCursorEmpty(readTimestamp, cursorId, batchCount));
+ return safeReadFuture.thenCompose(unused ->
retrieveExactEntriesUntilCursorEmpty(txId, readTimestamp, cursorId,
batchCount));
}
/**
* Extracts exact amount of entries, or less if cursor is become empty,
from a cursor on the specific time.
*
+ * @param txId Transaction id is used for RW only.
* @param readTimestamp Timestamp of the moment when that moment when the
data will be extracted.
* @param cursorId Cursor id.
* @param count Amount of entries which sill be extracted.
* @return Result future.
*/
private CompletableFuture<List<BinaryRow>>
retrieveExactEntriesUntilCursorEmpty(
- HybridTimestamp readTimestamp,
+ @Nullable UUID txId,
+ @Nullable HybridTimestamp readTimestamp,
IgniteUuid cursorId,
int count
) {
@SuppressWarnings("resource") PartitionTimestampCursor cursor =
(PartitionTimestampCursor) cursors.computeIfAbsent(cursorId,
- id -> mvDataStorage.scan(readTimestamp));
+ id -> mvDataStorage.scan(readTimestamp == null ?
HybridTimestamp.MAX_VALUE : readTimestamp));
var resolutionFuts = new
ArrayList<CompletableFuture<BinaryRow>>(count);
@@ -491,7 +494,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
BinaryRow candidate =
newestCommitTimestamp == null ||
!readResult.isWriteIntent() ? null : cursor.committed(newestCommitTimestamp);
- resolutionFuts.add(resolveRoReadResult(readResult, readTimestamp,
() -> candidate));
+ resolutionFuts.add(resolveReadResult(readResult, txId,
readTimestamp, () -> candidate));
}
return allOf(resolutionFuts.toArray(new
CompletableFuture[0])).thenCompose(unused -> {
@@ -506,7 +509,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
if (rows.size() < count && cursor.hasNext()) {
- return retrieveExactEntriesUntilCursorEmpty(readTimestamp,
cursorId, count - rows.size()).thenApply(binaryRows -> {
+ return retrieveExactEntriesUntilCursorEmpty(txId,
readTimestamp, cursorId, count - rows.size()).thenApply(binaryRows -> {
rows.addAll(binaryRows);
return rows;
@@ -517,6 +520,38 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
+ /**
+ * Extracts exact amount of entries, or less if cursor is become empty,
from a cursor on the specific time. Use it for RW.
+ *
+ * @param txId Transaction id.
+ * @param cursorId Cursor id.
+ * @return Future finishes with the resolved binary row.
+ */
+ private CompletableFuture<List<BinaryRow>>
retrieveExactEntriesUntilCursorEmpty(UUID txId, IgniteUuid cursorId, int count)
{
+ return retrieveExactEntriesUntilCursorEmpty(txId, null, cursorId,
count).thenCompose(rows -> {
+ if (CollectionUtils.nullOrEmpty(rows)) {
+ return completedFuture(Collections.emptyList());
+ }
+
+ CompletableFuture[] futs = new CompletableFuture[rows.size()];
+
+ for (BinaryRow row : rows) {
+ futs[rows.indexOf(row)] =
schemaCompatValidator.validateBackwards(row.schemaVersion(), tableId(), txId)
+ .thenCompose(validationResult -> {
+ if (validationResult.isSuccessful()) {
+ return completedFuture(row);
+ } else {
+ throw new
IncompatibleSchemaException("Operation failed because schema "
+ + validationResult.fromSchemaVersion()
+ " is not backward-compatible with "
+ + validationResult.toSchemaVersion() +
" for table " + validationResult.failedTableId());
+ }
+ });
+ }
+
+ return CompletableFuture.allOf(futs).thenApply((unused) -> rows);
+ });
+ }
+
/**
* Processes single entry request for read only transaction.
*
@@ -700,34 +735,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
- return lockManager.acquire(txId, new LockKey(tableId()),
LockMode.S).thenCompose(tblLock -> {
- var batchRows = new ArrayList<BinaryRow>(batchCount);
-
- @SuppressWarnings("resource") PartitionTimestampCursor cursor =
(PartitionTimestampCursor) cursors.computeIfAbsent(cursorId,
- id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
-
- return continueScanBatchRetrieval(cursor, batchCount, txId,
batchRows);
- });
- }
-
- private CompletableFuture<List<BinaryRow>> continueScanBatchRetrieval(
- PartitionTimestampCursor cursor,
- int batchCount,
- UUID txId,
- List<BinaryRow> batchRows
- ) {
- if (batchRows.size() < batchCount && cursor.hasNext()) {
- return resolveAndCheckReadCompatibility(cursor.next(), txId)
- .thenCompose(resolvedReadResult -> {
- if (resolvedReadResult != null) {
- batchRows.add(resolvedReadResult);
- }
-
- return continueScanBatchRetrieval(cursor, batchCount,
txId, batchRows);
- });
- }
-
- return completedFuture(batchRows);
+ return lockManager.acquire(txId, new LockKey(tableId()),
LockMode.S).thenCompose(tblLock ->
+ retrieveExactEntriesUntilCursorEmpty(txId, cursorId,
batchCount));
}
/**
@@ -915,28 +924,13 @@ public class PartitionReplicaListener implements
ReplicaListener {
RowId rowId = indexRow.rowId();
- ReadResult readResult = mvDataStorage.read(rowId, timestamp);
-
- return resolveRoReadResult(readResult, timestamp, () -> {
- if (readResult.newestCommitTimestamp() == null) {
- return null;
+ return resolvePlainReadResult(rowId, null,
timestamp).thenComposeAsync(resolvedReadResult -> {
+ if (resolvedReadResult != null && indexRowMatches(indexRow,
resolvedReadResult, schemaAwareIndexStorage)) {
+ result.add(resolvedReadResult);
}
- ReadResult committedReadResult = mvDataStorage.read(rowId,
readResult.newestCommitTimestamp());
-
- assert !committedReadResult.isWriteIntent() :
- "The result is not committed [rowId=" + rowId + ",
timestamp="
- + readResult.newestCommitTimestamp() + ']';
-
- return committedReadResult.binaryRow();
- })
- .thenComposeAsync(resolvedReadResult -> {
- if (resolvedReadResult != null &&
indexRowMatches(indexRow, resolvedReadResult, schemaAwareIndexStorage)) {
- result.add(resolvedReadResult);
- }
-
- return continueReadOnlyIndexScan(schemaAwareIndexStorage,
cursor, timestamp, batchSize, result);
- }, scanRequestExecutor);
+ return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor,
timestamp, batchSize, result);
+ }, scanRequestExecutor);
}
/**
@@ -970,28 +964,28 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(null); // End of range reached.
Exit loop.
}
- return lockManager.acquire(txId, new LockKey(tableId(),
currentRow.rowId()), LockMode.S)
+ RowId rowId = currentRow.rowId();
+
+ return lockManager.acquire(txId, new LockKey(tableId(),
rowId), LockMode.S)
.thenComposeAsync(rowLock -> { // Table row S lock
- ReadResult readResult =
mvDataStorage.read(currentRow.rowId(), HybridTimestamp.MAX_VALUE);
- return
resolveAndCheckReadCompatibility(readResult, txId)
- .thenCompose(resolvedReadResult -> {
- if (resolvedReadResult != null) {
- if
(indexRowMatches(currentRow, resolvedReadResult, schemaAwareIndexStorage)) {
-
result.add(resolvedReadResult);
- }
- }
-
- // Proceed scan.
- return continueIndexScan(
- txId,
- schemaAwareIndexStorage,
- indexLocker,
- indexCursor,
- batchSize,
- result,
- isUpperBoundAchieved
- );
- });
+ return resolvePlainReadResult(rowId,
txId).thenCompose(resolvedReadResult -> {
+ if (resolvedReadResult != null) {
+ if (indexRowMatches(currentRow,
resolvedReadResult, schemaAwareIndexStorage)) {
+ result.add(resolvedReadResult);
+ }
+ }
+
+ // Proceed scan.
+ return continueIndexScan(
+ txId,
+ schemaAwareIndexStorage,
+ indexLocker,
+ indexCursor,
+ batchSize,
+ result,
+ isUpperBoundAchieved
+ );
+ });
}, scanRequestExecutor);
});
}
@@ -1024,34 +1018,29 @@ public class PartitionReplicaListener implements
ReplicaListener {
return lockManager.acquire(txId, new LockKey(tableId(), rowId),
LockMode.S)
.thenComposeAsync(rowLock -> { // Table row S lock
- ReadResult readResult = mvDataStorage.read(rowId,
HybridTimestamp.MAX_VALUE);
- return resolveAndCheckReadCompatibility(readResult, txId)
- .thenCompose(resolvedReadResult -> {
- if (resolvedReadResult != null) {
- result.add(resolvedReadResult);
- }
+ return resolvePlainReadResult(rowId,
txId).thenCompose(resolvedReadResult -> {
+ if (resolvedReadResult != null) {
+ result.add(resolvedReadResult);
+ }
- // Proceed lookup.
- return continueIndexLookup(txId, indexCursor,
batchSize, result);
- });
+ // Proceed lookup.
+ return continueIndexLookup(txId, indexCursor,
batchSize, result);
+ });
}, scanRequestExecutor);
}
- private CompletableFuture<Void> continueReadOnlyIndexLookup(
- Cursor<RowId> indexCursor,
- HybridTimestamp timestamp,
- int batchSize,
- List<BinaryRow> result
- ) {
- if (result.size() >= batchSize || !indexCursor.hasNext()) {
- return completedFuture(null);
- }
-
- RowId rowId = indexCursor.next();
-
- ReadResult readResult = mvDataStorage.read(rowId, timestamp);
+ /**
+ * Resolves a result received from a direct storage read.
+ *
+ * @param rowId Row id to resolve.
+ * @param txId Transaction id is used for RW only.
+ * @param timestamp Read timestamp.
+ * @return Future finishes with the resolved binary row.
+ */
+ private CompletableFuture<BinaryRow> resolvePlainReadResult(RowId rowId,
@Nullable UUID txId, @Nullable HybridTimestamp timestamp) {
+ ReadResult readResult = mvDataStorage.read(rowId, timestamp == null ?
HybridTimestamp.MAX_VALUE : timestamp);
- return resolveRoReadResult(readResult, timestamp, () -> {
+ return resolveReadResult(readResult, txId, timestamp, () -> {
if (readResult.newestCommitTimestamp() == null) {
return null;
}
@@ -1063,7 +1052,48 @@ public class PartitionReplicaListener implements
ReplicaListener {
+ readResult.newestCommitTimestamp() + ']';
return committedReadResult.binaryRow();
- }).thenComposeAsync(resolvedReadResult -> {
+ });
+ }
+
+ /**
+ * Resolves a result received from a direct storage read. Use it for RW.
+ *
+ * @param rowId Row id.
+ * @param txId Transaction id.
+ * @return Future finishes with the resolved binary row.
+ */
+ private CompletableFuture<BinaryRow> resolvePlainReadResult(RowId rowId,
UUID txId) {
+ return resolvePlainReadResult(rowId, txId, null).thenCompose(row -> {
+ if (row == null) {
+ return completedFuture(null);
+ }
+
+ return
schemaCompatValidator.validateBackwards(row.schemaVersion(), tableId(), txId)
+ .thenApply(validationResult -> {
+ if (validationResult.isSuccessful()) {
+ return row;
+ } else {
+ throw new IncompatibleSchemaException("Operation
failed because schema "
+ + validationResult.fromSchemaVersion() + "
is not backward-compatible with "
+ + validationResult.toSchemaVersion() + "
for table " + validationResult.failedTableId());
+ }
+ });
+ });
+ }
+
+ private CompletableFuture<Void> continueReadOnlyIndexLookup(
+ Cursor<RowId> indexCursor,
+ HybridTimestamp timestamp,
+ int batchSize,
+ List<BinaryRow> result
+ ) {
+ if (result.size() >= batchSize || !indexCursor.hasNext()) {
+ return completedFuture(null);
+ }
+
+ RowId rowId = indexCursor.next();
+
+ return resolvePlainReadResult(rowId, null,
timestamp).thenComposeAsync(resolvedReadResult -> {
if (resolvedReadResult != null) {
result.add(resolvedReadResult);
}
@@ -1326,14 +1356,13 @@ public class PartitionReplicaListener implements
ReplicaListener {
RowId rowId = cursor.next();
- return resolveAndCheckReadCompatibility(mvDataStorage.read(rowId,
HybridTimestamp.MAX_VALUE), txId)
- .thenCompose(row -> {
- if (row != null) {
- return action.apply(rowId, row);
- } else {
- return continueResolvingByPk(cursor, txId, action);
- }
- });
+ return resolvePlainReadResult(rowId, txId).thenCompose(row -> {
+ if (row != null) {
+ return action.apply(rowId, row);
+ } else {
+ return continueResolvingByPk(cursor, txId, action);
+ }
+ });
}
@@ -2246,100 +2275,44 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
}
- private CompletableFuture<BinaryRow>
resolveAndCheckReadCompatibility(ReadResult readResult, UUID txId) {
- BinaryRow row = resolveRwReadResult(readResult, txId);
-
- if (row == null) {
- return completedFuture(row);
- }
-
- return schemaCompatValidator.validateBackwards(row.schemaVersion(),
tableId(), txId)
- .thenCompose(validationResult -> {
- if (validationResult.isSuccessful()) {
- return completedFuture(row);
- } else {
- throw new IncompatibleSchemaException("Operation
failed because schema "
- + validationResult.fromSchemaVersion() + " is
not backward-compatible with "
- + validationResult.toSchemaVersion() + " for
table " + validationResult.failedTableId());
- }
- });
-
- }
-
- /**
- * Resolves a read result for RW transaction.
- *
- * @param readResult Read result to resolve.
- * @param txId Transaction id.
- * @return Resolved binary row.
- */
- @Nullable
- private BinaryRow resolveRwReadResult(ReadResult readResult, UUID txId) {
- // This is a safe join (waiting of the future result), because the
resolution for RW transaction cannot lead to a network request.
- return resolveReadResult(readResult, txId, null, null).join();
- }
-
- /**
- * Resolves a read result for RO transaction.
- *
- * @param readResult Read result to resolve.
- * @param timestamp Timestamp.
- * @param lastCommitted Action to get the latest committed row.
- * @return Future to resolved binary row.
- */
- private CompletableFuture<BinaryRow> resolveRoReadResult(
- ReadResult readResult,
- HybridTimestamp timestamp,
- Supplier<BinaryRow> lastCommitted
- ) {
- return resolveReadResult(readResult, null, timestamp, lastCommitted);
- }
-
/**
* Resolves read result to the corresponding binary row. Following rules
are used for read result resolution:
* <ol>
- * <li>If txId is not null (RW request), assert that retrieved tx id
matches proposed one or that retrieved tx id is null
+ * <li>If timestamp is null (RW request), assert that retrieved tx id
matches proposed one or that retrieved tx id is null
* and return binary row. Currently it's only possible to retrieve
write intents if they belong to the same transaction,
* locks prevent reading write intents created by others.</li>
- * <li>If txId is null (RO request), perform write intent resolution
if given readResult is a write intent itself
+ * <li>If timestamp is not null (RO request), perform write intent
resolution if given readResult is a write intent itself
* or return binary row otherwise.</li>
* </ol>
*
* @param readResult Read result to resolve.
* @param txId Nullable transaction id, should be provided if resolution
is performed within the context of RW transaction.
* @param timestamp Timestamp is used in RO transaction only.
- * @param lastCommitted Action to get the latest committed row, it is used
in RO transaction only.
+ * @param lastCommitted Action to get the latest committed row.
* @return Future to resolved binary row.
*/
private CompletableFuture<BinaryRow> resolveReadResult(
ReadResult readResult,
@Nullable UUID txId,
@Nullable HybridTimestamp timestamp,
- @Nullable Supplier<BinaryRow> lastCommitted
+ Supplier<BinaryRow> lastCommitted
) {
if (readResult == null) {
return completedFuture(null);
+ } else if (!readResult.isWriteIntent()) {
+ return completedFuture(readResult.binaryRow());
} else {
- if (txId != null) {
- // RW request.
+ // RW resolution.
+ if (timestamp == null) {
UUID retrievedResultTxId = readResult.transactionId();
- if (retrievedResultTxId == null ||
txId.equals(retrievedResultTxId)) {
+ if (txId.equals(retrievedResultTxId)) {
// Same transaction - return retrieved value. It may be
both writeIntent or regular value.
return completedFuture(readResult.binaryRow());
- } else {
- // Should never happen, currently, locks prevent reading
another transaction intents during RW requests.
- throw new AssertionError("Mismatched transaction id,
expectedTxId={" + txId + "},"
- + " actualTxId={" + retrievedResultTxId + '}');
- }
- } else {
- if (!readResult.isWriteIntent()) {
- return completedFuture(readResult.binaryRow());
}
-
- // RO request.
- return resolveWriteIntentAsync(readResult, timestamp,
lastCommitted);
}
+
+ return resolveWriteIntentAsync(readResult, timestamp,
lastCommitted);
}
}
@@ -2359,14 +2332,14 @@ public class PartitionReplicaListener implements
ReplicaListener {
return resolveTxState(
new TablePartitionId(readResult.commitTableId(),
readResult.commitPartitionId()),
readResult.transactionId(),
- timestamp)
- .thenApply(readLastCommitted -> {
- if (readLastCommitted) {
- return lastCommitted.get();
- } else {
- return readResult.binaryRow();
- }
- });
+ timestamp
+ ).thenApply(readLastCommitted -> {
+ if (readLastCommitted) {
+ return lastCommitted.get();
+ } else {
+ return readResult.binaryRow();
+ }
+ });
}
/**
@@ -2375,25 +2348,25 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param commitGrpId Commit partition id.
* @param txId Transaction id.
* @param timestamp Timestamp.
- * @return Future with boolean value, indicating whether the transaction
was committed before timestamp.
+ * @return The future completes with true when the transaction is not
completed yet and false otherwise.
*/
private CompletableFuture<Boolean> resolveTxState(
TablePartitionId commitGrpId,
UUID txId,
HybridTimestamp timestamp
) {
- requireNonNull(timestamp, "timestamp");
+ boolean readLatest = timestamp == null;
return placementDriver.sendMetaRequest(commitGrpId,
FACTORY.txStateReplicaRequest()
.groupId(commitGrpId)
- .readTimestampLong(timestamp.longValue())
+ .readTimestampLong((readLatest ?
HybridTimestamp.MIN_VALUE : timestamp).longValue())
.txId(txId)
.build())
.thenApply(txMeta -> {
if (txMeta == null) {
return true;
} else if (txMeta.txState() == TxState.COMMITED) {
- return txMeta.commitTimestamp().compareTo(timestamp) >
0;
+ return !readLatest &&
txMeta.commitTimestamp().compareTo(timestamp) > 0;
} else {
assert txMeta.txState() == TxState.ABORTED :
"Unexpected transaction state [state=" + txMeta.txState() + ']';