This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 41e09209ef IGNITE-20482 Simplify write intent resolution in PartitionReplicaListener (#2625) 41e09209ef is described below commit 41e09209ef13bea448d183cb39586f4f88237385 Author: Cyrill <cyrill.si...@gmail.com> AuthorDate: Mon Sep 25 14:45:18 2023 +0300 IGNITE-20482 Simplify write intent resolution in PartitionReplicaListener (#2625) --- .../apache/ignite/internal/util/IgniteUtils.java | 10 ++ .../ignite/internal/table/ItColocationTest.java | 2 +- .../table/distributed/StorageUpdateHandler.java | 105 ++++++++-------- .../replicator/PartitionReplicaListener.java | 134 +++++++++++---------- .../org/apache/ignite/internal/tx/TxState.java | 6 + .../ignite/internal/tx/impl/TxManagerImpl.java | 6 +- 6 files changed, 135 insertions(+), 128 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 72e9fba7d7..1ea9fed961 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -913,6 +913,16 @@ public class IgniteUtils { return result; } + /** + * Find the first element in the given list. + * + * @param list List. + * @return Optional containing element (if present). + */ + public static <T> Optional<T> findFirst(List<T> list) { + return list.isEmpty() ? Optional.empty() : Optional.of(list.get(0)); + } + /** * Find any element in given collection. * diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index 020b1cf550..362ba76c8d 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -144,7 +144,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest { ) { @Override public CompletableFuture<Void> finish( - HybridTimestampTracker timestampTracker, + HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, ClusterNode recipientNode, Long term, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java index cd6e8518fb..f029e779b8 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java @@ -265,7 +265,7 @@ public class StorageUpdateHandler { * @param commitTimestamp Commit timestamp. Not {@code null} if {@code commit} is {@code true}. */ public void handleTransactionCleanup(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) { - handleTransactionCleanup(txId, commit, commitTimestamp, () -> {}); + handleTransactionCleanup(txId, commit, commitTimestamp, null); } /** @@ -276,86 +276,75 @@ public class StorageUpdateHandler { * @param commitTimestamp Commit timestamp. Not {@code null} if {@code commit} is {@code true}. * @param onApplication On application callback. */ - public void handleTransactionCleanup(UUID txId, boolean commit, - @Nullable HybridTimestamp commitTimestamp, Runnable onApplication) { + public void handleTransactionCleanup( + UUID txId, + boolean commit, + @Nullable HybridTimestamp commitTimestamp, + @Nullable Runnable onApplication) { Set<RowId> pendingRowIds = pendingRows.removePendingRowIds(txId); - handleTransactionCleanup(pendingRowIds, commit, commitTimestamp, onApplication); - } + // `pendingRowIds` might be empty when we have already cleaned up the storage for this transaction, + // for example, when primary (PartitionReplicaListener) is collocated with the raft node (PartitionListener) + // and one of them has already processed the cleanup request, since they share the instance of this class. + // However, we still need to run `onApplication` if it is not null, e.g. called in TxCleanupCommand handler in PartitionListener + // to update indexes. In this case it should be executed under `runConsistently`. + if (!pendingRowIds.isEmpty() || onApplication != null) { + storage.runConsistently(locker -> { + pendingRowIds.forEach(locker::lock); + + if (commit) { + performCommitWrite(pendingRowIds, commitTimestamp); + } else { + performAbortWrite(pendingRowIds); + } - /** - * Handles the cleanup of a transaction. The transaction is either committed or rolled back. - * - * @param pendingRowIds Row ids of write-intents to be finalized, either committed or rolled back. - * @param commit Commit flag. {@code true} if transaction is committed, {@code false} otherwise. - * @param commitTimestamp Commit timestamp. Not {@code null} if {@code commit} is {@code true}. - * @param onApplication On application callback. - */ - private void handleTransactionCleanup(Set<RowId> pendingRowIds, boolean commit, - @Nullable HybridTimestamp commitTimestamp, Runnable onApplication) { - if (commit) { - handleTransactionCommit(pendingRowIds, commitTimestamp, onApplication); - } else { - handleTransactionAbortion(pendingRowIds, onApplication); + if (onApplication != null) { + onApplication.run(); + } + + return null; + }); } } /** - * Handles the commit of a transaction. + * Commit write intents created by the provided transaction. * * @param pendingRowIds Row ids of write-intents to be committed. * @param commitTimestamp Commit timestamp. - * @param onApplication On application callback. */ - private void handleTransactionCommit(Set<RowId> pendingRowIds, HybridTimestamp commitTimestamp, Runnable onApplication) { - storage.runConsistently(locker -> { - pendingRowIds.forEach(locker::lock); - - pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, commitTimestamp)); - - onApplication.run(); - - return null; - }); + private void performCommitWrite(Set<RowId> pendingRowIds, HybridTimestamp commitTimestamp) { + pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, commitTimestamp)); } /** - * Handles the abortion of a transaction. + * Abort write intents created by the provided transaction. * - * @param pendingRowIds Row ids of write-intents to be rolled back. - * @param onApplication On application callback. + * @param pendingRowIds Row ids of write-intents to be aborted. */ - private void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable onApplication) { - storage.runConsistently(locker -> { - for (RowId rowId : pendingRowIds) { - locker.lock(rowId); - - try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) { - if (!cursor.hasNext()) { - continue; - } - - ReadResult item = cursor.next(); + private void performAbortWrite(Set<RowId> pendingRowIds) { + for (RowId rowId : pendingRowIds) { + try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) { + if (!cursor.hasNext()) { + continue; + } - // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Prevent double storage updates within primary - if (item.isWriteIntent()) { - BinaryRow rowToRemove = item.binaryRow(); + ReadResult item = cursor.next(); - if (rowToRemove == null) { - continue; - } + // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Prevent double storage updates within primary + if (item.isWriteIntent()) { + BinaryRow rowToRemove = item.binaryRow(); - indexUpdateHandler.tryRemoveFromIndexes(rowToRemove, rowId, cursor); + if (rowToRemove == null) { + continue; } + + indexUpdateHandler.tryRemoveFromIndexes(rowToRemove, rowId, cursor); } } + } - pendingRowIds.forEach(storage::abortWrite); - - onApplication.run(); - - return null; - }); + pendingRowIds.forEach(storage::abortWrite); } /** diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 0197aa61cf..1f6e8fc3f6 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -29,9 +29,10 @@ import static org.apache.ignite.internal.tx.TxState.ABANDONED; import static org.apache.ignite.internal.tx.TxState.ABORTED; import static org.apache.ignite.internal.tx.TxState.COMMITED; import static org.apache.ignite.internal.tx.TxState.PENDING; +import static org.apache.ignite.internal.tx.TxState.isFinalState; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; -import static org.apache.ignite.internal.util.IgniteUtils.filter; import static org.apache.ignite.internal.util.IgniteUtils.findAny; +import static org.apache.ignite.internal.util.IgniteUtils.findFirst; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR; import static org.apache.ignite.lang.IgniteStringFormatter.format; @@ -1434,7 +1435,7 @@ public class PartitionReplicaListener implements ReplicaListener { txOps = new TxCleanupReadyFutureList(); } - if (txOps.state == ABORTED || txOps.state == COMMITED) { + if (isFinalState(txOps.state)) { fut.completeExceptionally( new TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is already finished.")); } else { @@ -1470,37 +1471,47 @@ public class PartitionReplicaListener implements ReplicaListener { * @return Result of the given action. */ private CompletableFuture<BinaryRow> resolveRowByPkForReadOnly(BinaryTuple pk, HybridTimestamp ts) { + // Indexes store values associated with different versions of one entry. + // It's possible to have multiple entries for a particular search key + // only if we insert, delete and again insert an entry with the same indexed fields. + // It means that there exists one and only one non-empty readResult for any read timestamp for the given key. + // Which in turn means that if we have found non empty readResult during PK index iteration + // we can proceed with readResult resolution and stop the iteration. try (Cursor<RowId> cursor = getFromPkIndex(pk)) { - List<ReadResult> candidates = new ArrayList<>(); + // TODO https://issues.apache.org/jira/browse/IGNITE-18767 scan of multiple write intents should not be needed + List<ReadResult> writeIntents = new ArrayList<>(); + List<ReadResult> regularEntries = new ArrayList<>(); for (RowId rowId : cursor) { ReadResult readResult = mvDataStorage.read(rowId, ts); - if (!readResult.isEmpty() || readResult.isWriteIntent()) { - candidates.add(readResult); + if (readResult.isWriteIntent()) { + writeIntents.add(readResult); + } else if (!readResult.isEmpty()) { + regularEntries.add(readResult); } } - if (candidates.isEmpty()) { + // Nothing found in the storage, return null. + if (writeIntents.isEmpty() && regularEntries.isEmpty()) { return completedFuture(null); } - // TODO https://issues.apache.org/jira/browse/IGNITE-18767 scan of multiple write intents should not be needed - List<ReadResult> writeIntents = filter(candidates, ReadResult::isWriteIntent); - - if (!writeIntents.isEmpty()) { + if (writeIntents.isEmpty()) { + // No write intents, then return the committed value. We already know that regularEntries is not empty. + return completedFuture(regularEntries.get(0).binaryRow()); + } else { ReadResult writeIntent = writeIntents.get(0); // Assume that all write intents for the same key belong to the same transaction, as the key should be exclusively locked. // This means that we can just resolve the state of this transaction. checkWriteIntentsBelongSameTx(writeIntents); - return resolveTxState( - writeIntent.transactionId(), - new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()), - ts) - .thenApply(readLastCommitted -> { - if (readLastCommitted) { + return resolveWriteIntentReadability(writeIntent, ts) + .thenApply(writeIntentReadable -> { + if (writeIntentReadable) { + return findAny(writeIntents, wi -> !wi.isEmpty()).map(ReadResult::binaryRow).orElse(null); + } else { for (ReadResult wi : writeIntents) { HybridTimestamp newestCommitTimestamp = wi.newestCommitTimestamp(); @@ -1517,18 +1528,10 @@ public class PartitionReplicaListener implements ReplicaListener { return committedReadResult.binaryRow(); } - return findAny(candidates, c -> !c.isWriteIntent() && !c.isEmpty()).map(ReadResult::binaryRow) - .orElse(null); - } else { - return findAny(writeIntents, wi -> !wi.isEmpty()).map(ReadResult::binaryRow) - .orElse(null); + // No suitable value found in write intents, read the committed value (if exists) + return findFirst(regularEntries).map(ReadResult::binaryRow).orElse(null); } }); - } else { - BinaryRow result = findAny(candidates, r -> !r.isEmpty()).map(ReadResult::binaryRow) - .orElse(null); - - return completedFuture(result); } } catch (Exception e) { throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR, @@ -2466,49 +2469,49 @@ public class PartitionReplicaListener implements ReplicaListener { HybridTimestamp timestamp, Supplier<BinaryRow> lastCommitted ) { - return resolveTxState( - readResult.transactionId(), - new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()), - timestamp - ).thenApply(readLastCommitted -> { - if (readLastCommitted) { - return lastCommitted.get(); - } else { - return readResult.binaryRow(); - } - }); + return resolveWriteIntentReadability(readResult, timestamp) + .thenApply(writeIntentReadable -> writeIntentReadable ? readResult.binaryRow() : lastCommitted.get()); } /** - * Resolve the actual tx state. + * Check whether we can read from the provided write intent. * - * @param txId Transaction id. - * @param commitGrpId Commit partition id. + * @param writeIntent Write intent to resolve. * @param timestamp Timestamp. - * @return The future completes with true when the transaction is not completed yet and false otherwise. + * @return The future completes with {@code true} when the transaction is committed and commit time <= read time, + * {@code false} otherwise (whe the transaction is either in progress, or aborted, or committed and commit time > read time). */ - private CompletableFuture<Boolean> resolveTxState( - UUID txId, - TablePartitionId commitGrpId, - HybridTimestamp timestamp - ) { - return transactionStateResolver.resolveTxState(txId, commitGrpId, timestamp).thenApply(txMeta -> { - if (txMeta == null || txMeta.txState() == ABANDONED) { - // TODO https://issues.apache.org/jira/browse/IGNITE-20427 make the null value returned from commit partition - // TODO more determined - throw new TransactionAbandonedException(txId, txMeta); - } else if (txMeta.txState() == COMMITED) { - boolean readLatest = timestamp == null; - - return !readLatest && txMeta.commitTimestamp().compareTo(timestamp) > 0; - } else if (txMeta.txState() == ABORTED) { - return true; - } else { - assert txMeta.txState() == PENDING : "Unexpected transaction state [state=" + txMeta.txState() + ']'; + private CompletableFuture<Boolean> resolveWriteIntentReadability(ReadResult writeIntent, HybridTimestamp timestamp) { + UUID txId = writeIntent.transactionId(); - return true; - } - }); + return transactionStateResolver.resolveTxState( + txId, + new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()), + timestamp) + .thenApply(txMeta -> canReadFromWriteIntent(txId, txMeta, timestamp)); + } + + /** + * Check whether we can read write intents created by this transaction. + * + * @param txId Transaction id. + * @param txMeta Transaction metainfo. + * @param timestamp Read timestamp. + * @return {@code true} if we can read from entries created in this transaction + * (when the transaction was committed and commit time <= read time). + */ + private static Boolean canReadFromWriteIntent(UUID txId, @Nullable TransactionMeta txMeta, @Nullable HybridTimestamp timestamp) { + if (txMeta == null || txMeta.txState() == ABANDONED) { + // TODO https://issues.apache.org/jira/browse/IGNITE-20427 make the null value returned from commit partition + // TODO more determined + throw new TransactionAbandonedException(txId, txMeta); + } + if (txMeta.txState() == COMMITED) { + boolean readLatest = timestamp == null; + + return readLatest || txMeta.commitTimestamp().compareTo(timestamp) <= 0; + } + return false; } private CompletableFuture<Void> validateAtTimestamp(UUID txId) { @@ -2744,10 +2747,9 @@ public class PartitionReplicaListener implements ReplicaListener { final Map<RequestType, List<CompletableFuture<?>>> futures = new EnumMap<>(RequestType.class); /** - * Transaction state. {@code TxState#ABORTED} and {@code TxState#COMMITED} match the final transaction states. If the property is - * {@code null} the transaction is in pending state. + * Transaction state. {@code TxState#ABORTED} and {@code TxState#COMMITED} match the final transaction states. */ - TxState state; + TxState state = PENDING; } @Override @@ -2892,7 +2894,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param commitTimestamp Commit timestamp. */ private void markFinished(UUID txId, TxState txState, @Nullable HybridTimestamp commitTimestamp) { - assert txState == COMMITED || txState == ABORTED : "Unexpected state, txId=" + txId + ", txState=" + txState; + assert isFinalState(txState) : "Unexpected state [txId=" + txId + ", txState=" + txState + ']'; txManager.updateTxMeta(txId, old -> old == null ? null diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java index c5a897b647..4a14f16be4 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java @@ -38,6 +38,12 @@ public enum TxState { { true, true, true, true, true, true } }; + /** + * Checks whether the state is final, i.e. no transition from this state is allowed. + * + * @param state Transaction state. + * @return {@code true} if the state is either {@link #COMMITED} or {@link #ABORTED} + */ public static boolean isFinalState(TxState state) { return state == COMMITED || state == ABORTED; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index c0a6979335..776daa566f 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -221,7 +221,7 @@ public class TxManagerImpl implements TxManager { @Override public CompletableFuture<Void> finish( - HybridTimestampTracker timestampTracker, + HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, ClusterNode recipientNode, Long term, @@ -256,7 +256,7 @@ public class TxManagerImpl implements TxManager { return completedFuture(null); } - timestampTracker.update(commitTimestamp); + observableTimestampTracker.update(commitTimestamp); TxFinishReplicaRequest req = FACTORY.txFinishReplicaRequest() .txId(txId) @@ -319,7 +319,7 @@ public class TxManagerImpl implements TxManager { @Override public int finished() { return (int) txStateMap.entrySet().stream() - .filter(e -> e.getValue().txState() == COMMITED || e.getValue().txState() == ABORTED) + .filter(e -> isFinalState(e.getValue().txState())) .count(); }