This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 601746edcac IGNITE-26626 Added debug information output to the
testGlobalMinimumTxRequiredTime) test (#6901)
601746edcac is described below
commit 601746edcaca7321cffe76939636938e63358fec
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Wed Nov 12 15:41:45 2025 +0300
IGNITE-26626 Added debug information output to the
testGlobalMinimumTxRequiredTime) test (#6901)
---
.../compaction/ItCatalogCompactionTest.java | 199 ++++++++++++++++++---
.../compaction/CatalogCompactionRunner.java | 5 +
2 files changed, 180 insertions(+), 24 deletions(-)
diff --git
a/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
b/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
index d0301e1c7ba..b4945e642d7 100644
---
a/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
+++
b/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
@@ -25,18 +25,21 @@ import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_TEST_P
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -45,11 +48,15 @@ import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import
org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner.TimeHolder;
+import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.tx.ActiveLocalTxMinimumRequiredTimeProvider;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.awaitility.Awaitility;
@@ -138,63 +145,83 @@ class ItCatalogCompactionTest extends
ClusterPerClassIntegrationTest {
IgniteImpl node1 = unwrapIgniteImpl(CLUSTER.node(1));
IgniteImpl node2 = unwrapIgniteImpl(CLUSTER.node(2));
+ DebugInfoCollector debug = new DebugInfoCollector(List.of(node0,
node1, node2));
+
List<CatalogCompactionRunner> compactors = List.of(
node0.catalogCompactionRunner(),
node1.catalogCompactionRunner(),
node2.catalogCompactionRunner()
);
- Catalog catalog1 = getLatestCatalog(node2);
+ debug.recordGlobalTxState("init");
+ debug.recordCatalogState("init");
+ debug.recordMinTxTimesState("init");
- Transaction tx1 = beginTx(node0, false);
+ Catalog catalog1 = getLatestCatalog(node2);
+ InternalTransaction tx1 = beginTx(node0, false);
+ debug.recordTx(tx1);
// Changing the catalog and starting transaction.
sql("create table a(a int primary key)");
Catalog catalog2 = getLatestCatalog(node0);
assertThat(catalog2.version(), is(catalog1.version() + 1));
- List<Transaction> txs2 = Stream.of(node1, node2).map(node ->
beginTx(node, false)).collect(Collectors.toList());
+ List<InternalTransaction> txs2 = Stream.of(node1, node2).map(node ->
beginTx(node, false)).collect(Collectors.toList());
List<InternalTransaction> ignoredReadonlyTxs = Stream.of(node0, node1,
node2)
.map(node -> beginTx(node, true))
.collect(Collectors.toList());
+ debug.recordTx(txs2);
+ debug.recordTx(ignoredReadonlyTxs);
+
// Changing the catalog again and starting transaction.
sql("alter table a add column (b int)");
- Awaitility.await().untilAsserted(() ->
assertThat(getLatestCatalogVersion(node1), is(catalog2.version() + 1)));
+ Awaitility.await().until(() -> getLatestCatalogVersion(node1),
is(catalog2.version() + 1));
Catalog catalog3 = getLatestCatalog(node1);
- List<Transaction> txs3 = Stream.of(node0, node2).map(node ->
beginTx(node, false)).collect(Collectors.toList());
+ List<InternalTransaction> txs3 = Stream.of(node0, node2).map(node ->
beginTx(node, false)).collect(Collectors.toList());
+
+ debug.recordTx(txs3);
Collection<InternalClusterNode> topologyNodes =
node0.cluster().nodes().stream()
.map(ClusterNodeImpl::fromPublicClusterNode)
.collect(toUnmodifiableList());
- compactors.forEach(compactor -> {
- TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
- assertThat(timeHolder.txMinRequiredTime, is(catalog1.time()));
- });
+ for (int i = 0; i < compactors.size(); i++) {
+ TimeHolder timeHolder =
await(compactors.get(i).determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+
+ String failureMessage = "Initial condition failed on node #" + i;
+
+ assertEquals(catalog1.time(), timeHolder.txMinRequiredTime, () ->
debug.dumpDebugInfo(failureMessage));
+ }
tx1.rollback();
- compactors.forEach(compactor -> {
- TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
- assertThat(timeHolder.txMinRequiredTime, is(catalog2.time()));
- });
+ for (int i = 0; i < compactors.size(); i++) {
+ TimeHolder timeHolder =
await(compactors.get(i).determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+
+ String failureMessage = "Condition failed after first tx rollback
on node #" + i;
+
+ assertEquals(catalog2.time(), timeHolder.txMinRequiredTime, () ->
debug.dumpDebugInfo(failureMessage));
+ }
txs2.forEach(Transaction::commit);
- compactors.forEach(compactor -> {
- TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
- assertThat(timeHolder.txMinRequiredTime, is(catalog3.time()));
- });
+ for (int i = 0; i < compactors.size(); i++) {
+ TimeHolder timeHolder =
await(compactors.get(i).determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+
+ String failureMessage = "Condition failed after transactions
commit on node #" + i;
+
+ assertEquals(catalog3.time(), timeHolder.txMinRequiredTime, () ->
debug.dumpDebugInfo(failureMessage));
+ }
txs3.forEach(Transaction::rollback);
// Since there are no active RW transactions in the cluster, the
minimum time will be min(now()) across all nodes.
- compactors.forEach(compactor -> {
+ for (int i = 0; i < compactors.size(); i++) {
long minTime = Stream.of(node0, node1, node2).map(node ->
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
- TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+ TimeHolder timeHolder =
await(compactors.get(i).determineGlobalMinimumRequiredTime(topologyNodes, 0L));
long maxTime = Stream.of(node0, node1, node2).map(node ->
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
@@ -203,9 +230,12 @@ class ItCatalogCompactionTest extends
ClusterPerClassIntegrationTest {
assertThat(timeHolder.txMinRequiredTime,
greaterThan(tx.schemaTimestamp().longValue()));
});
- assertThat(timeHolder.txMinRequiredTime,
greaterThanOrEqualTo(minTime));
- assertThat(timeHolder.txMinRequiredTime,
lessThanOrEqualTo(maxTime));
- });
+ long actual = timeHolder.txMinRequiredTime;
+
+ String failureMessage = "node #" + i + ": " + minTime + " <= " +
actual + " <= " + maxTime;
+
+ assertTrue(minTime <= actual && actual <= maxTime, () ->
debug.dumpDebugInfo(failureMessage));
+ }
ignoredReadonlyTxs.forEach(Transaction::rollback);
}
@@ -305,4 +335,125 @@ class ItCatalogCompactionTest extends
ClusterPerClassIntegrationTest {
}
});
}
+
+ private class DebugInfoCollector {
+ private final List<IgniteImpl> nodes;
+ private final Map<UUID, String> nodesById;
+ private final IgniteStringBuilder buffer = new
IgniteStringBuilder("Test debug info").nl();
+ private final List<InternalTransaction> transactions = new
ArrayList<>();
+
+ DebugInfoCollector(List<IgniteImpl> nodes) {
+ this.nodes = nodes;
+ Map<UUID, String> nodesById = new HashMap<>();
+
+ for (IgniteImpl node : nodes) {
+ nodesById.put(node.id(), node.name());
+ }
+
+ this.nodesById = nodesById;
+ }
+
+ void recordCatalogState(String contextMessage) {
+ buffer.nl();
+
+ for (IgniteImpl node : nodes) {
+ buffer.app("Catalog state(").app(contextMessage)
+ .app(") on node ").app(node.name()).nl();
+
+ CatalogManager mgr = node.catalogManager();
+
+ for (int ver = mgr.earliestCatalogVersion(); ver <=
mgr.latestCatalogVersion(); ver++) {
+ Catalog catalog = mgr.catalog(ver);
+
+ buffer.app(" ").app(ver).app(" | ").app(catalog == null ?
-1 : catalog.time()).nl();
+ }
+ }
+ }
+
+ void recordMinTxTimesState(String contextMessage) {
+ buffer.nl();
+
+ buffer.app("Minimum RW tx times
(").app(contextMessage).app(')').nl();
+
+ for (IgniteImpl node : nodes) {
+ ActiveLocalTxMinimumRequiredTimeProvider timeProvider =
node.catalogCompactionRunner()
+ .activeLocalTxMinimumRequiredTimeProvider();
+
+ buffer.app(" ").app(node.name()).app(":
").app(timeProvider.minimumRequiredTime()).nl();
+ }
+ }
+
+ void recordGlobalTxState(String contextMessage) {
+ buffer.nl();
+
+ buffer.app("System transactions state
(").app(contextMessage).app(')').nl();
+
+ for (IgniteImpl node : nodes) {
+ TxManager txManager = node.txManager();
+
+ buffer.app(" ").app(node.name())
+ .app(": pending=").app(txManager.pending())
+ .app(", finished=").app(txManager.finished())
+ .nl();
+ }
+ }
+
+ void recordTransactionsState() {
+ // Sort by start time.
+ transactions.sort(Comparator.comparing(t ->
beginTimestamp(t.id())));
+
+ List<InternalTransaction> roTransactions = new ArrayList<>();
+ List<InternalTransaction> rwTransactions = new ArrayList<>();
+
+ for (InternalTransaction tx : transactions) {
+ if (tx.isReadOnly()) {
+ roTransactions.add(tx);
+ } else {
+ rwTransactions.add(tx);
+ }
+ }
+
+ buffer.nl();
+ buffer.app("RW transactions state").nl();
+
+ for (InternalTransaction tx : rwTransactions) {
+ buffer.app(" ")
+ .app(tx.isFinishingOrFinished() ? "finished" : "active
").app(" | ")
+ .app(nodesById.get(tx.coordinatorId())).app(" | ")
+ .app(beginTimestamp(tx.id()))
+ .nl();
+ }
+
+ buffer.nl();
+ buffer.app("RO transactions state").nl();
+
+ for (InternalTransaction tx : roTransactions) {
+ buffer.app(" ")
+ .app(tx.isFinishingOrFinished() ? "finished" : "active
").app(" | ")
+ .app(beginTimestamp(tx.id()))
+ .nl();
+ }
+ }
+
+ void recordTx(InternalTransaction tx) {
+ transactions.add(tx);
+ }
+
+ void recordTx(List<InternalTransaction> txs) {
+ transactions.addAll(txs);
+ }
+
+ String dumpDebugInfo(String messageHeader) {
+ recordGlobalTxState("onFailure");
+ recordCatalogState("onFailure");
+ recordMinTxTimesState("onFailure");
+ recordTransactionsState();
+
+ String debugInfo = messageHeader + System.lineSeparator() +
System.lineSeparator() + buffer.toString();
+
+ log.info(debugInfo);
+
+ return debugInfo;
+ }
+ }
}
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
index cefe94eaddb..a476cae82f9 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
+++
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
@@ -239,6 +239,11 @@ public class CatalogCompactionRunner implements
IgniteComponent {
return compactionCoordinatorNodeName;
}
+ @TestOnly
+ public ActiveLocalTxMinimumRequiredTimeProvider
activeLocalTxMinimumRequiredTimeProvider() {
+ return activeLocalTxMinimumRequiredTimeProvider;
+ }
+
/** Called when the low watermark has been changed. */
public CompletableFuture<Boolean> onLowWatermarkChanged(HybridTimestamp
newLowWatermark) {
lowWatermark = newLowWatermark;