This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch ignite-23488 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit cecc7ece0e0de8e4670f837bbc0fc75d0929e752 Author: Pavel Pereslegin <[email protected]> AuthorDate: Tue Feb 11 12:25:02 2025 +0300 IGNITE-23488 Review comments, code cleanup. --- .../ignite/internal/lowwatermark/LowWatermark.java | 8 ---- .../internal/lowwatermark/LowWatermarkImpl.java | 7 --- .../internal/lowwatermark/TestLowWatermark.java | 7 --- .../ignite/internal/tx/impl/TxManagerImpl.java | 2 +- .../tx/views/TransactionsViewProvider.java | 55 ++++++++++++---------- 5 files changed, 31 insertions(+), 48 deletions(-) diff --git a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java index 4aeeaeb2f7f..3164619628b 100644 --- a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java +++ b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.lowwatermark; -import java.util.Set; import java.util.UUID; import java.util.function.Consumer; import org.apache.ignite.internal.event.EventProducer; @@ -68,11 +67,4 @@ public interface LowWatermark extends EventProducer<LowWatermarkEvent, LowWaterm * @param lockId ID of the transaction that locks the low watermark. */ void unlock(UUID lockId); - - /** - * Returns a set of all transaction IDs that lock the low watermark. - * - * @return Set of all transaction IDs that lock the low watermark. - */ - Set<UUID> lockIds(); } diff --git a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java index 94c10cd4e60..f2753c43434 100644 --- a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java +++ b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java @@ -25,9 +25,7 @@ import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; -import java.util.Collections; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -335,11 +333,6 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L lock.future().complete(null); } - @Override - public Set<UUID> lockIds() { - return Collections.unmodifiableSet(locks.keySet()); - } - CompletableFuture<Void> updateAndNotify(HybridTimestamp newLowWatermark) { return inBusyLockAsync(busyLock, () -> { vaultManager.put(LOW_WATERMARK_VAULT_KEY, newLowWatermark.toBytes()); diff --git a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java index b5b5951903e..66c6798fbcc 100644 --- a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java +++ b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java @@ -24,9 +24,7 @@ import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.Collections; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -117,11 +115,6 @@ public class TestLowWatermark extends AbstractEventProducer<LowWatermarkEvent, L lock.future().complete(null); } - @Override - public Set<UUID> lockIds() { - return Collections.unmodifiableSet(locks.keySet()); - } - /** * Update low watermark and notify listeners. * diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 1d4c4ce6cc8..fbd1b61849e 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -870,7 +870,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler, SystemVi txStateVolatileStorage.start(); - txViewProvider.init(localNodeId, txStateVolatileStorage.statesMap()); + txViewProvider.init(txStateVolatileStorage.statesMap().values()); orphanDetector.start(txStateVolatileStorage, txConfig.abandonedCheckTs()); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java index 64eab27fa38..c7c6604228c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java @@ -17,12 +17,13 @@ package org.apache.ignite.internal.tx.views; +import static org.apache.ignite.internal.tx.TxState.isFinalState; import static org.apache.ignite.internal.type.NativeTypes.stringOf; import java.time.Instant; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.Map; import java.util.UUID; import java.util.concurrent.Flow.Publisher; import org.apache.ignite.internal.systemview.api.SystemView; @@ -33,7 +34,6 @@ import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.internal.type.NativeType; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.SubscriptionUtils; -import org.jetbrains.annotations.Nullable; /** * {@code TRANSACTIONS} system view provider. @@ -46,14 +46,8 @@ public class TransactionsViewProvider { private volatile Iterable<TxInfo> dataSource; /** Initializes provider with data sources. */ - public void init( - UUID localNodeId, - Map<UUID, TxStateMeta> rwTxStates - ) { - this.dataSource = new TxInfoDataSource( - localNodeId, - rwTxStates - ); + public void init(Collection<TxStateMeta> txStates) { + this.dataSource = new TxInfoDataSource(txStates); } /** Returns a {@code TRANSACTIONS} system view. */ @@ -84,42 +78,53 @@ public class TransactionsViewProvider { } static class TxInfoDataSource implements Iterable<TxInfo> { - private final UUID localNodeId; - - private final Map<UUID, TxStateMeta> rwTxStates; + private final Collection<TxStateMeta> txStates; - TxInfoDataSource(UUID localNodeId, Map<UUID, TxStateMeta> rwTxStates) { - this.localNodeId = localNodeId; - this.rwTxStates = rwTxStates; + TxInfoDataSource(Collection<TxStateMeta> txStates) { + this.txStates = txStates; } @Override public Iterator<TxInfo> iterator() { - return rwTxStates.entrySet().stream() - .filter(e -> localNodeId.equals(e.getValue().txCoordinatorId()) - && e.getValue().tx() != null && !e.getValue().tx().isFinishingOrFinished()) - .map(e -> new TxInfo(e.getKey(), e.getValue())) + return txStates.stream() + .filter(txStateMeta -> { + InternalTransaction tx = txStateMeta.tx(); + + if (tx == null) { + return false; + } + + // Currently the read-only transaction status does not change and it is always in the PENDING state. + if ((tx.isReadOnly() && tx.isFinishingOrFinished()) || isFinalState(txStateMeta.txState())) { + return false; + } + + return true; + }) + .map(TxInfo::new) .iterator(); } } static class TxInfo { private final String id; - private final @Nullable String state; + private final String state; private final Instant startTime; private final String type; private final String priority; - TxInfo(UUID id, TxStateMeta txStateMeta) { + TxInfo(TxStateMeta txStateMeta) { InternalTransaction tx = txStateMeta.tx(); assert tx != null; - this.id = id.toString(); + UUID txId = tx.id(); + + this.id = txId.toString(); this.state = txStateMeta.txState().name(); - this.startTime = Instant.ofEpochMilli(TransactionIds.beginTimestamp(id).getPhysical()); + this.startTime = Instant.ofEpochMilli(TransactionIds.beginTimestamp(txId).getPhysical()); this.type = tx.isReadOnly() ? READ_ONLY : READ_WRITE; - this.priority = TransactionIds.priority(id).name(); + this.priority = TransactionIds.priority(txId).name(); } } }
