This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 41e09209ef IGNITE-20482 Simplify write intent resolution in
PartitionReplicaListener (#2625)
41e09209ef is described below
commit 41e09209ef13bea448d183cb39586f4f88237385
Author: Cyrill <[email protected]>
AuthorDate: Mon Sep 25 14:45:18 2023 +0300
IGNITE-20482 Simplify write intent resolution in PartitionReplicaListener
(#2625)
---
.../apache/ignite/internal/util/IgniteUtils.java | 10 ++
.../ignite/internal/table/ItColocationTest.java | 2 +-
.../table/distributed/StorageUpdateHandler.java | 105 ++++++++--------
.../replicator/PartitionReplicaListener.java | 134 +++++++++++----------
.../org/apache/ignite/internal/tx/TxState.java | 6 +
.../ignite/internal/tx/impl/TxManagerImpl.java | 6 +-
6 files changed, 135 insertions(+), 128 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 72e9fba7d7..1ea9fed961 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -913,6 +913,16 @@ public class IgniteUtils {
return result;
}
+ /**
+ * Find the first element in the given list.
+ *
+ * @param list List.
+ * @return Optional containing element (if present).
+ */
+ public static <T> Optional<T> findFirst(List<T> list) {
+ return list.isEmpty() ? Optional.empty() : Optional.of(list.get(0));
+ }
+
/**
* Find any element in given collection.
*
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 020b1cf550..362ba76c8d 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -144,7 +144,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
) {
@Override
public CompletableFuture<Void> finish(
- HybridTimestampTracker timestampTracker,
+ HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
ClusterNode recipientNode,
Long term,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index cd6e8518fb..f029e779b8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -265,7 +265,7 @@ public class StorageUpdateHandler {
* @param commitTimestamp Commit timestamp. Not {@code null} if {@code
commit} is {@code true}.
*/
public void handleTransactionCleanup(UUID txId, boolean commit, @Nullable
HybridTimestamp commitTimestamp) {
- handleTransactionCleanup(txId, commit, commitTimestamp, () -> {});
+ handleTransactionCleanup(txId, commit, commitTimestamp, null);
}
/**
@@ -276,86 +276,75 @@ public class StorageUpdateHandler {
* @param commitTimestamp Commit timestamp. Not {@code null} if {@code
commit} is {@code true}.
* @param onApplication On application callback.
*/
- public void handleTransactionCleanup(UUID txId, boolean commit,
- @Nullable HybridTimestamp commitTimestamp, Runnable onApplication)
{
+ public void handleTransactionCleanup(
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ @Nullable Runnable onApplication) {
Set<RowId> pendingRowIds = pendingRows.removePendingRowIds(txId);
- handleTransactionCleanup(pendingRowIds, commit, commitTimestamp,
onApplication);
- }
+ // `pendingRowIds` might be empty when we have already cleaned up the
storage for this transaction,
+ // for example, when primary (PartitionReplicaListener) is collocated
with the raft node (PartitionListener)
+ // and one of them has already processed the cleanup request, since
they share the instance of this class.
+ // However, we still need to run `onApplication` if it is not null,
e.g. called in TxCleanupCommand handler in PartitionListener
+ // to update indexes. In this case it should be executed under
`runConsistently`.
+ if (!pendingRowIds.isEmpty() || onApplication != null) {
+ storage.runConsistently(locker -> {
+ pendingRowIds.forEach(locker::lock);
+
+ if (commit) {
+ performCommitWrite(pendingRowIds, commitTimestamp);
+ } else {
+ performAbortWrite(pendingRowIds);
+ }
- /**
- * Handles the cleanup of a transaction. The transaction is either
committed or rolled back.
- *
- * @param pendingRowIds Row ids of write-intents to be finalized, either
committed or rolled back.
- * @param commit Commit flag. {@code true} if transaction is committed,
{@code false} otherwise.
- * @param commitTimestamp Commit timestamp. Not {@code null} if {@code
commit} is {@code true}.
- * @param onApplication On application callback.
- */
- private void handleTransactionCleanup(Set<RowId> pendingRowIds, boolean
commit,
- @Nullable HybridTimestamp commitTimestamp, Runnable onApplication)
{
- if (commit) {
- handleTransactionCommit(pendingRowIds, commitTimestamp,
onApplication);
- } else {
- handleTransactionAbortion(pendingRowIds, onApplication);
+ if (onApplication != null) {
+ onApplication.run();
+ }
+
+ return null;
+ });
}
}
/**
- * Handles the commit of a transaction.
+ * Commit write intents created by the provided transaction.
*
* @param pendingRowIds Row ids of write-intents to be committed.
* @param commitTimestamp Commit timestamp.
- * @param onApplication On application callback.
*/
- private void handleTransactionCommit(Set<RowId> pendingRowIds,
HybridTimestamp commitTimestamp, Runnable onApplication) {
- storage.runConsistently(locker -> {
- pendingRowIds.forEach(locker::lock);
-
- pendingRowIds.forEach(rowId -> storage.commitWrite(rowId,
commitTimestamp));
-
- onApplication.run();
-
- return null;
- });
+ private void performCommitWrite(Set<RowId> pendingRowIds, HybridTimestamp
commitTimestamp) {
+ pendingRowIds.forEach(rowId -> storage.commitWrite(rowId,
commitTimestamp));
}
/**
- * Handles the abortion of a transaction.
+ * Abort write intents created by the provided transaction.
*
- * @param pendingRowIds Row ids of write-intents to be rolled back.
- * @param onApplication On application callback.
+ * @param pendingRowIds Row ids of write-intents to be aborted.
*/
- private void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable
onApplication) {
- storage.runConsistently(locker -> {
- for (RowId rowId : pendingRowIds) {
- locker.lock(rowId);
-
- try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
- if (!cursor.hasNext()) {
- continue;
- }
-
- ReadResult item = cursor.next();
+ private void performAbortWrite(Set<RowId> pendingRowIds) {
+ for (RowId rowId : pendingRowIds) {
+ try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+ if (!cursor.hasNext()) {
+ continue;
+ }
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Prevent double storage
updates within primary
- if (item.isWriteIntent()) {
- BinaryRow rowToRemove = item.binaryRow();
+ ReadResult item = cursor.next();
- if (rowToRemove == null) {
- continue;
- }
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20124
Prevent double storage updates within primary
+ if (item.isWriteIntent()) {
+ BinaryRow rowToRemove = item.binaryRow();
- indexUpdateHandler.tryRemoveFromIndexes(rowToRemove,
rowId, cursor);
+ if (rowToRemove == null) {
+ continue;
}
+
+ indexUpdateHandler.tryRemoveFromIndexes(rowToRemove,
rowId, cursor);
}
}
+ }
- pendingRowIds.forEach(storage::abortWrite);
-
- onApplication.run();
-
- return null;
- });
+ pendingRowIds.forEach(storage::abortWrite);
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0197aa61cf..1f6e8fc3f6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -29,9 +29,10 @@ import static
org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITED;
import static org.apache.ignite.internal.tx.TxState.PENDING;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-import static org.apache.ignite.internal.util.IgniteUtils.filter;
import static org.apache.ignite.internal.util.IgniteUtils.findAny;
+import static org.apache.ignite.internal.util.IgniteUtils.findFirst;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
@@ -1434,7 +1435,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
txOps = new TxCleanupReadyFutureList();
}
- if (txOps.state == ABORTED || txOps.state == COMMITED) {
+ if (isFinalState(txOps.state)) {
fut.completeExceptionally(
new
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is
already finished."));
} else {
@@ -1470,37 +1471,47 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Result of the given action.
*/
private CompletableFuture<BinaryRow> resolveRowByPkForReadOnly(BinaryTuple
pk, HybridTimestamp ts) {
+ // Indexes store values associated with different versions of one
entry.
+ // It's possible to have multiple entries for a particular search key
+ // only if we insert, delete and again insert an entry with the same
indexed fields.
+ // It means that there exists one and only one non-empty readResult
for any read timestamp for the given key.
+ // Which in turn means that if we have found non empty readResult
during PK index iteration
+ // we can proceed with readResult resolution and stop the iteration.
try (Cursor<RowId> cursor = getFromPkIndex(pk)) {
- List<ReadResult> candidates = new ArrayList<>();
+ // TODO https://issues.apache.org/jira/browse/IGNITE-18767 scan of
multiple write intents should not be needed
+ List<ReadResult> writeIntents = new ArrayList<>();
+ List<ReadResult> regularEntries = new ArrayList<>();
for (RowId rowId : cursor) {
ReadResult readResult = mvDataStorage.read(rowId, ts);
- if (!readResult.isEmpty() || readResult.isWriteIntent()) {
- candidates.add(readResult);
+ if (readResult.isWriteIntent()) {
+ writeIntents.add(readResult);
+ } else if (!readResult.isEmpty()) {
+ regularEntries.add(readResult);
}
}
- if (candidates.isEmpty()) {
+ // Nothing found in the storage, return null.
+ if (writeIntents.isEmpty() && regularEntries.isEmpty()) {
return completedFuture(null);
}
- // TODO https://issues.apache.org/jira/browse/IGNITE-18767 scan of
multiple write intents should not be needed
- List<ReadResult> writeIntents = filter(candidates,
ReadResult::isWriteIntent);
-
- if (!writeIntents.isEmpty()) {
+ if (writeIntents.isEmpty()) {
+ // No write intents, then return the committed value. We
already know that regularEntries is not empty.
+ return completedFuture(regularEntries.get(0).binaryRow());
+ } else {
ReadResult writeIntent = writeIntents.get(0);
// Assume that all write intents for the same key belong to
the same transaction, as the key should be exclusively locked.
// This means that we can just resolve the state of this
transaction.
checkWriteIntentsBelongSameTx(writeIntents);
- return resolveTxState(
- writeIntent.transactionId(),
- new TablePartitionId(writeIntent.commitTableId(),
writeIntent.commitPartitionId()),
- ts)
- .thenApply(readLastCommitted -> {
- if (readLastCommitted) {
+ return resolveWriteIntentReadability(writeIntent, ts)
+ .thenApply(writeIntentReadable -> {
+ if (writeIntentReadable) {
+ return findAny(writeIntents, wi ->
!wi.isEmpty()).map(ReadResult::binaryRow).orElse(null);
+ } else {
for (ReadResult wi : writeIntents) {
HybridTimestamp newestCommitTimestamp =
wi.newestCommitTimestamp();
@@ -1517,18 +1528,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
return committedReadResult.binaryRow();
}
- return findAny(candidates, c ->
!c.isWriteIntent() && !c.isEmpty()).map(ReadResult::binaryRow)
- .orElse(null);
- } else {
- return findAny(writeIntents, wi ->
!wi.isEmpty()).map(ReadResult::binaryRow)
- .orElse(null);
+ // No suitable value found in write intents,
read the committed value (if exists)
+ return
findFirst(regularEntries).map(ReadResult::binaryRow).orElse(null);
}
});
- } else {
- BinaryRow result = findAny(candidates, r ->
!r.isEmpty()).map(ReadResult::binaryRow)
- .orElse(null);
-
- return completedFuture(result);
}
} catch (Exception e) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
@@ -2466,49 +2469,49 @@ public class PartitionReplicaListener implements
ReplicaListener {
HybridTimestamp timestamp,
Supplier<BinaryRow> lastCommitted
) {
- return resolveTxState(
- readResult.transactionId(),
- new TablePartitionId(readResult.commitTableId(),
readResult.commitPartitionId()),
- timestamp
- ).thenApply(readLastCommitted -> {
- if (readLastCommitted) {
- return lastCommitted.get();
- } else {
- return readResult.binaryRow();
- }
- });
+ return resolveWriteIntentReadability(readResult, timestamp)
+ .thenApply(writeIntentReadable -> writeIntentReadable ?
readResult.binaryRow() : lastCommitted.get());
}
/**
- * Resolve the actual tx state.
+ * Check whether we can read from the provided write intent.
*
- * @param txId Transaction id.
- * @param commitGrpId Commit partition id.
+ * @param writeIntent Write intent to resolve.
* @param timestamp Timestamp.
- * @return The future completes with true when the transaction is not
completed yet and false otherwise.
+ * @return The future completes with {@code true} when the transaction is
committed and commit time <= read time,
+ * {@code false} otherwise (whe the transaction is either in progress,
or aborted, or committed and commit time > read time).
*/
- private CompletableFuture<Boolean> resolveTxState(
- UUID txId,
- TablePartitionId commitGrpId,
- HybridTimestamp timestamp
- ) {
- return transactionStateResolver.resolveTxState(txId, commitGrpId,
timestamp).thenApply(txMeta -> {
- if (txMeta == null || txMeta.txState() == ABANDONED) {
- // TODO https://issues.apache.org/jira/browse/IGNITE-20427
make the null value returned from commit partition
- // TODO more determined
- throw new TransactionAbandonedException(txId, txMeta);
- } else if (txMeta.txState() == COMMITED) {
- boolean readLatest = timestamp == null;
-
- return !readLatest &&
txMeta.commitTimestamp().compareTo(timestamp) > 0;
- } else if (txMeta.txState() == ABORTED) {
- return true;
- } else {
- assert txMeta.txState() == PENDING : "Unexpected transaction
state [state=" + txMeta.txState() + ']';
+ private CompletableFuture<Boolean>
resolveWriteIntentReadability(ReadResult writeIntent, HybridTimestamp
timestamp) {
+ UUID txId = writeIntent.transactionId();
- return true;
- }
- });
+ return transactionStateResolver.resolveTxState(
+ txId,
+ new TablePartitionId(writeIntent.commitTableId(),
writeIntent.commitPartitionId()),
+ timestamp)
+ .thenApply(txMeta -> canReadFromWriteIntent(txId, txMeta,
timestamp));
+ }
+
+ /**
+ * Check whether we can read write intents created by this transaction.
+ *
+ * @param txId Transaction id.
+ * @param txMeta Transaction metainfo.
+ * @param timestamp Read timestamp.
+ * @return {@code true} if we can read from entries created in this
transaction
+ * (when the transaction was committed and commit time <= read time).
+ */
+ private static Boolean canReadFromWriteIntent(UUID txId, @Nullable
TransactionMeta txMeta, @Nullable HybridTimestamp timestamp) {
+ if (txMeta == null || txMeta.txState() == ABANDONED) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20427 make
the null value returned from commit partition
+ // TODO more determined
+ throw new TransactionAbandonedException(txId, txMeta);
+ }
+ if (txMeta.txState() == COMMITED) {
+ boolean readLatest = timestamp == null;
+
+ return readLatest || txMeta.commitTimestamp().compareTo(timestamp)
<= 0;
+ }
+ return false;
}
private CompletableFuture<Void> validateAtTimestamp(UUID txId) {
@@ -2744,10 +2747,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
final Map<RequestType, List<CompletableFuture<?>>> futures = new
EnumMap<>(RequestType.class);
/**
- * Transaction state. {@code TxState#ABORTED} and {@code
TxState#COMMITED} match the final transaction states. If the property is
- * {@code null} the transaction is in pending state.
+ * Transaction state. {@code TxState#ABORTED} and {@code
TxState#COMMITED} match the final transaction states.
*/
- TxState state;
+ TxState state = PENDING;
}
@Override
@@ -2892,7 +2894,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param commitTimestamp Commit timestamp.
*/
private void markFinished(UUID txId, TxState txState, @Nullable
HybridTimestamp commitTimestamp) {
- assert txState == COMMITED || txState == ABORTED : "Unexpected state,
txId=" + txId + ", txState=" + txState;
+ assert isFinalState(txState) : "Unexpected state [txId=" + txId + ",
txState=" + txState + ']';
txManager.updateTxMeta(txId, old -> old == null
? null
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
index c5a897b647..4a14f16be4 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
@@ -38,6 +38,12 @@ public enum TxState {
{ true, true, true, true, true, true }
};
+ /**
+ * Checks whether the state is final, i.e. no transition from this state
is allowed.
+ *
+ * @param state Transaction state.
+ * @return {@code true} if the state is either {@link #COMMITED} or {@link
#ABORTED}
+ */
public static boolean isFinalState(TxState state) {
return state == COMMITED || state == ABORTED;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index c0a6979335..776daa566f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -221,7 +221,7 @@ public class TxManagerImpl implements TxManager {
@Override
public CompletableFuture<Void> finish(
- HybridTimestampTracker timestampTracker,
+ HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
ClusterNode recipientNode,
Long term,
@@ -256,7 +256,7 @@ public class TxManagerImpl implements TxManager {
return completedFuture(null);
}
- timestampTracker.update(commitTimestamp);
+ observableTimestampTracker.update(commitTimestamp);
TxFinishReplicaRequest req = FACTORY.txFinishReplicaRequest()
.txId(txId)
@@ -319,7 +319,7 @@ public class TxManagerImpl implements TxManager {
@Override
public int finished() {
return (int) txStateMap.entrySet().stream()
- .filter(e -> e.getValue().txState() == COMMITED ||
e.getValue().txState() == ABORTED)
+ .filter(e -> isFinalState(e.getValue().txState()))
.count();
}