This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 51fa042e8a IGNITE-22641 Catalog compaction. Introduce
UpdateMinimumActiveTxBeginTimeCommand RAFT command (#4137)
51fa042e8a is described below
commit 51fa042e8a2f359d9b99bac35caa8aa9510ccae2
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Wed Aug 14 19:01:03 2024 +0300
IGNITE-22641 Catalog compaction. Introduce
UpdateMinimumActiveTxBeginTimeCommand RAFT command (#4137)
---
modules/catalog-compaction/build.gradle | 13 +
.../compaction/ItCatalogCompactionTest.java | 222 +++++++++++++
.../compaction/CatalogCompactionRunner.java | 345 +++++++++++++++------
.../compaction/CatalogManagerCompactionFacade.java | 90 ++++++
.../message/CatalogCompactionMessageGroup.java | 8 +-
...a => CatalogCompactionMinimumTimesRequest.java} | 9 +-
... => CatalogCompactionMinimumTimesResponse.java} | 18 +-
.../compaction/AbstractCatalogCompactionTest.java | 64 ++++
.../CatalogCompactionRunnerSelfTest.java | 295 +++++++++++++-----
.../CatalogManagerCompactionFacadeTest.java | 113 +++++++
.../internal/catalog/CatalogManagerImpl.java | 10 +-
.../catalog/CatalogManagerRecoveryTest.java | 4 +-
.../internal/catalog/CatalogManagerSelfTest.java | 13 +-
.../IndexNodeFinishedRwTransactionsChecker.java | 17 +-
.../network/PartitionReplicationMessageGroup.java | 10 +
.../UpdateMinimumActiveTxBeginTimeCommand.java} | 16 +-
...ateMinimumActiveTxBeginTimeReplicaRequest.java} | 14 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 35 ++-
.../table/distributed/raft/PartitionListener.java | 40 +++
.../replicator/PartitionReplicaListener.java | 21 ++
.../tx/ActiveLocalTxMinimumBeginTimeProvider.java} | 18 +-
21 files changed, 1127 insertions(+), 248 deletions(-)
diff --git a/modules/catalog-compaction/build.gradle
b/modules/catalog-compaction/build.gradle
index 5cca292065..b1dff7f364 100644
--- a/modules/catalog-compaction/build.gradle
+++ b/modules/catalog-compaction/build.gradle
@@ -23,6 +23,7 @@ apply from:
"$rootDir/buildscripts/java-integration-test.gradle"
dependencies {
annotationProcessor project(':ignite-network-annotation-processor')
+ annotationProcessor libs.auto.service
implementation project(':ignite-affinity')
implementation project(':ignite-catalog')
@@ -30,9 +31,14 @@ dependencies {
implementation project(':ignite-core')
implementation project(':ignite-network-api')
implementation project(':ignite-placement-driver-api')
+ implementation project(':ignite-replicator')
+ implementation project(':ignite-partition-replicator')
implementation project(':ignite-system-view-api')
+ implementation project(':ignite-table')
+ implementation project(':ignite-transactions')
implementation libs.jetbrains.annotations
+ implementation libs.fastutil.core
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-metastorage')))
@@ -41,7 +47,14 @@ dependencies {
testImplementation libs.mockito.core
testImplementation libs.hamcrest.core
+ integrationTestImplementation libs.fastutil.core
integrationTestImplementation project(':ignite-api')
+ integrationTestImplementation project(':ignite-catalog')
+ integrationTestImplementation project(':ignite-transactions')
+ integrationTestImplementation project(':ignite-raft')
+ integrationTestImplementation project(':ignite-raft-api')
+ integrationTestImplementation project(':ignite-system-view-api')
+ integrationTestImplementation project(':ignite-table')
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
}
diff --git
a/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
b/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
new file mode 100644
index 0000000000..29564fe70f
--- /dev/null
+++
b/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import
org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner.TimeHolder;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests to verify catalog compaction.
+ */
+class ItCatalogCompactionTest extends ClusterPerClassIntegrationTest {
+ private static final int CLUSTER_SIZE = 3;
+
+ @Override
+ protected int initialNodes() {
+ return CLUSTER_SIZE;
+ }
+
+ @Test
+ void testRaftGroupsUpdate() throws InterruptedException {
+ IgniteImpl ignite = CLUSTER.aliveNode();
+ CatalogManagerImpl catalogManager = ((CatalogManagerImpl)
CLUSTER.aliveNode().catalogManager());
+ int partsCount = 16;
+
+ sql(format("create zone if not exists test with partitions={},
replicas={}, storage_profiles='default'",
+ partsCount, initialNodes()));
+ sql("alter zone test set default");
+
+ Map<Integer, Integer> expectedTablesWithPartitions = new HashMap<>();
+
+ // Latest active catalog contains all required tables.
+ {
+ sql("create table a(a int primary key)");
+
+ Catalog minRequiredCatalog =
catalogManager.catalog(catalogManager.latestCatalogVersion());
+ assertNotNull(minRequiredCatalog);
+
+ sql("create table b(a int primary key)");
+
+ Catalog lastCatalog = catalogManager.catalog(
+
catalogManager.activeCatalogVersion(ignite.clock().nowLong()));
+ assertNotNull(lastCatalog);
+
+ Collection<CatalogTableDescriptor> tables = lastCatalog.tables();
+ assertThat(tables, hasSize(2));
+
+ tables.forEach(t -> expectedTablesWithPartitions.put(t.id(),
partsCount));
+
+ HybridTimestamp expectedTime =
HybridTimestamp.hybridTimestamp(minRequiredCatalog.time());
+
+ CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+ .propagateTimeToReplicas(expectedTime.longValue(),
ignite.clusterNodes());
+
+ assertThat(fut, willCompleteSuccessfully());
+
+ ensureTimestampStoredInAllReplicas(expectedTime,
expectedTablesWithPartitions);
+ }
+
+ // Latest active catalog does not contain all required tables.
+ // Replicas of dropped tables must also be updated.
+ long requiredTime = CLUSTER.aliveNode().clockService().nowLong();
+
+ {
+ sql("drop table a");
+ sql("drop table b");
+
+ HybridTimestamp expectedTime =
HybridTimestamp.hybridTimestamp(requiredTime);
+
+ CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+ .propagateTimeToReplicas(expectedTime.longValue(),
ignite.clusterNodes());
+
+ assertThat(fut, willCompleteSuccessfully());
+
+ ensureTimestampStoredInAllReplicas(expectedTime,
expectedTablesWithPartitions);
+ }
+ }
+
+ @Test
+ void testGlobalMinimumTxBeginTime() {
+ IgniteImpl node0 = CLUSTER.node(0);
+ IgniteImpl node1 = CLUSTER.node(1);
+ IgniteImpl node2 = CLUSTER.node(2);
+
+ List<CatalogCompactionRunner> compactors = List.of(
+ node0.catalogCompactionRunner(),
+ node1.catalogCompactionRunner(),
+ node2.catalogCompactionRunner()
+ );
+
+ Collection<ClusterNode> topologyNodes = node0.clusterNodes();
+
+ InternalTransaction tx1 = (InternalTransaction)
node0.transactions().begin();
+ InternalTransaction tx2 = (InternalTransaction)
node1.transactions().begin();
+ InternalTransaction readonlyTx = (InternalTransaction)
node1.transactions().begin(new TransactionOptions().readOnly(true));
+ InternalTransaction tx3 = (InternalTransaction)
node2.transactions().begin();
+
+ compactors.forEach(compactor -> {
+ TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+ assertThat(timeHolder.minActiveTxBeginTime,
is(tx1.startTimestamp().longValue()));
+ });
+
+ tx1.rollback();
+
+ compactors.forEach(compactor -> {
+ TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+ assertThat(timeHolder.minActiveTxBeginTime,
is(tx2.startTimestamp().longValue()));
+ });
+
+ tx2.commit();
+
+ compactors.forEach(compactor -> {
+ TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+ assertThat(timeHolder.minActiveTxBeginTime,
is(tx3.startTimestamp().longValue()));
+ });
+
+ tx3.rollback();
+
+ // Since there are no active RW transactions in the cluster, the
minimum time will be min(now()) across all nodes.
+ compactors.forEach(compactor -> {
+ long minTime = Stream.of(node0, node1, node2).map(node ->
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
+
+ TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+
+ long maxTime = Stream.of(node0, node1, node2).map(node ->
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
+
+ // Read-only transactions are not counted,
+ assertThat(timeHolder.minActiveTxBeginTime,
greaterThan(readonlyTx.startTimestamp().longValue()));
+
+ assertThat(timeHolder.minActiveTxBeginTime,
greaterThanOrEqualTo(minTime));
+ assertThat(timeHolder.minActiveTxBeginTime,
lessThanOrEqualTo(maxTime));
+ });
+
+ readonlyTx.rollback();
+ }
+
+ private static void ensureTimestampStoredInAllReplicas(
+ HybridTimestamp expectedTimestamp,
+ Map<Integer, Integer> expectedTablesWithPartitions
+ ) throws InterruptedException {
+ Loza loza = CLUSTER.aliveNode().raftManager();
+ JraftServerImpl server = (JraftServerImpl) loza.server();
+
+ for (Map.Entry<Integer, Integer> tableWithPartition :
expectedTablesWithPartitions.entrySet()) {
+ int tableId = tableWithPartition.getKey();
+ int partitionsCount = tableWithPartition.getValue();
+
+ for (int p = 0; p < partitionsCount; p++) {
+ TablePartitionId groupId = new TablePartitionId(tableId, p);
+ List<Peer> peers = server.localPeers(groupId);
+
+ assertThat(peers, is(not(empty())));
+
+ Peer serverPeer = server.localPeers(groupId).get(0);
+ RaftGroupService grp = server.raftGroupService(new
RaftNodeId(groupId, serverPeer));
+ DelegatingStateMachine fsm = (DelegatingStateMachine)
grp.getRaftNode().getOptions().getFsm();
+ PartitionListener listener = (PartitionListener)
fsm.getListener();
+
+ // When a future completes from `Invoke`, it is guaranteed
that the leader will be updated,
+ // the remaining replicas can be updated later.
+ IgniteTestUtils.waitForCondition(
+ () ->
Long.valueOf(expectedTimestamp.longValue()).equals(listener.minimumActiveTxBeginTime()),
+ 5_000
+ );
+
+ assertThat(grp.getGroupId(),
listener.minimumActiveTxBeginTime(), equalTo(expectedTimestamp.longValue()));
+ }
+ }
+ }
+}
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
index 0b1337ddd8..2c4be73c28 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
+++
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
@@ -24,19 +24,20 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMessageGroup;
import
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMessagesFactory;
-import
org.apache.ignite.internal.catalog.compaction.message.CatalogMinimumRequiredTimeRequest;
-import
org.apache.ignite.internal.catalog.compaction.message.CatalogMinimumRequiredTimeResponse;
+import
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMinimumTimesRequest;
+import
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMinimumTimesResponse;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
@@ -49,10 +50,16 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.MessagingService;
-import org.apache.ignite.internal.network.NetworkMessage;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.ActiveLocalTxMinimumBeginTimeProvider;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
@@ -70,7 +77,7 @@ import org.jetbrains.annotations.TestOnly;
* <li>Routine is triggered after receiving local notification that the
low watermark
* has been updated on catalog compaction coordinator (metastorage group
leader).</li>
* <li>Coordinator calculates the minimum required time in the cluster
- * by sending {@link CatalogMinimumRequiredTimeRequest} to all cluster
members.</li>
+ * by sending {@link CatalogCompactionMinimumTimesRequest} to all cluster
members.</li>
* <li>If it is considered safe to trim the history up to calculated
catalog version
* (at least, all partition owners are present in the logical topology),
then the catalog is compacted.</li>
* </ol>
@@ -78,11 +85,15 @@ import org.jetbrains.annotations.TestOnly;
public class CatalogCompactionRunner implements IgniteComponent {
private static final IgniteLogger LOG =
Loggers.forClass(CatalogCompactionRunner.class);
- private static final CatalogCompactionMessagesFactory MESSAGES_FACTORY =
new CatalogCompactionMessagesFactory();
+ private static final CatalogCompactionMessagesFactory
COMPACTION_MESSAGES_FACTORY = new CatalogCompactionMessagesFactory();
+
+ private static final PartitionReplicationMessagesFactory
REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
+
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
private static final long ANSWER_TIMEOUT = 5_000;
- private final CatalogManagerImpl catalogManager;
+ private final CatalogManagerCompactionFacade catalogManagerFacade;
private final MessagingService messagingService;
@@ -100,6 +111,14 @@ public class CatalogCompactionRunner implements
IgniteComponent {
private final String localNodeName;
+ private final ActiveLocalTxMinimumBeginTimeProvider
activeLocalTxMinimumBeginTimeProvider;
+
+ private final ReplicaService replicaService;
+
+ private final SchemaSyncService schemaSyncService;
+
+ private CompletableFuture<Void> lastRunFuture =
CompletableFutures.nullCompletedFuture();
+
/**
* Node that is considered to be a coordinator of compaction process.
*
@@ -107,8 +126,6 @@ public class CatalogCompactionRunner implements
IgniteComponent {
*/
private volatile @Nullable String compactionCoordinatorNodeName;
- private volatile CompletableFuture<Boolean> lastRunFuture =
CompletableFutures.nullCompletedFuture();
-
private volatile HybridTimestamp lowWatermark;
/**
@@ -120,10 +137,14 @@ public class CatalogCompactionRunner implements
IgniteComponent {
MessagingService messagingService,
LogicalTopologyService logicalTopologyService,
PlacementDriver placementDriver,
+ ReplicaService replicaService,
ClockService clockService,
- Executor executor
+ SchemaSyncService schemaSyncService,
+ Executor executor,
+ ActiveLocalTxMinimumBeginTimeProvider
activeLocalTxMinimumBeginTimeProvider
) {
- this(localNodeName, catalogManager, messagingService,
logicalTopologyService, placementDriver, clockService, executor, null);
+ this(localNodeName, catalogManager, messagingService,
logicalTopologyService, placementDriver, replicaService, clockService,
+ schemaSyncService, executor,
activeLocalTxMinimumBeginTimeProvider, null);
}
/**
@@ -135,17 +156,23 @@ public class CatalogCompactionRunner implements
IgniteComponent {
MessagingService messagingService,
LogicalTopologyService logicalTopologyService,
PlacementDriver placementDriver,
+ ReplicaService replicaService,
ClockService clockService,
+ SchemaSyncService schemaSyncService,
Executor executor,
+ ActiveLocalTxMinimumBeginTimeProvider
activeLocalTxMinimumBeginTimeProvider,
@Nullable CatalogCompactionRunner.MinimumRequiredTimeProvider
localMinTimeProvider
) {
- this.messagingService = messagingService;
this.localNodeName = localNodeName;
+ this.messagingService = messagingService;
this.logicalTopologyService = logicalTopologyService;
- this.catalogManager = catalogManager;
+ this.catalogManagerFacade = new
CatalogManagerCompactionFacade(catalogManager);
this.clockService = clockService;
+ this.schemaSyncService = schemaSyncService;
this.placementDriver = placementDriver;
+ this.replicaService = replicaService;
this.executor = executor;
+ this.activeLocalTxMinimumBeginTimeProvider =
activeLocalTxMinimumBeginTimeProvider;
this.localMinTimeProvider = localMinTimeProvider == null ?
this::determineLocalMinimumRequiredTime : localMinTimeProvider;
}
@@ -153,14 +180,21 @@ public class CatalogCompactionRunner implements
IgniteComponent {
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
messagingService.addMessageHandler(CatalogCompactionMessageGroup.class,
(message, sender, correlationId) -> {
assert message.groupType() ==
CatalogCompactionMessageGroup.GROUP_TYPE : message.groupType();
- assert message.messageType() ==
CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_REQUEST :
message.messageType();
- assert correlationId != null;
- CatalogMinimumRequiredTimeResponse response =
MESSAGES_FACTORY.catalogMinimumRequiredTimeResponse()
- .timestamp(localMinTimeProvider.time())
- .build();
+ if (message.messageType() ==
CatalogCompactionMessageGroup.MINIMUM_TIMES_REQUEST) {
+ assert correlationId != null;
+
+ CatalogCompactionMinimumTimesResponse response =
COMPACTION_MESSAGES_FACTORY.catalogCompactionMinimumTimesResponse()
+ .minimumRequiredTime(localMinTimeProvider.time())
+
.minimumActiveTxTime(activeLocalTxMinimumBeginTimeProvider.minimumBeginTime().longValue())
+ .build();
- messagingService.respond(sender, response, correlationId);
+ messagingService.respond(sender, response, correlationId);
+
+ return;
+ }
+
+ throw new UnsupportedOperationException("Not supported message
type: " + message.messageType());
});
return CompletableFutures.nullCompletedFuture();
@@ -196,115 +230,118 @@ public class CatalogCompactionRunner implements
IgniteComponent {
}
@TestOnly
- CompletableFuture<Boolean> lastRunFuture() {
+ synchronized CompletableFuture<Void> lastRunFuture() {
return lastRunFuture;
}
/** Starts the catalog compaction routine. */
- CompletableFuture<Boolean> triggerCompaction(@Nullable HybridTimestamp
lwm) {
+ void triggerCompaction(@Nullable HybridTimestamp lwm) {
if (lwm == null ||
!localNodeName.equals(compactionCoordinatorNodeName)) {
- return CompletableFutures.falseCompletedFuture();
+ return;
}
- return inBusyLock(busyLock, () -> {
- CompletableFuture<Boolean> fut = lastRunFuture;
+ inBusyLock(busyLock, () -> {
+ synchronized (this) {
+ CompletableFuture<Void> fut = lastRunFuture;
- if (!fut.isDone()) {
- LOG.info("Catalog compaction is already in progress, skipping
(timestamp={})", lwm.longValue());
+ if (!fut.isDone()) {
+ LOG.info("Catalog compaction is already in progress,
skipping (timestamp={})", lwm.longValue());
- return CompletableFutures.falseCompletedFuture();
- }
+ return;
+ }
- fut =
startCompaction(logicalTopologyService.localLogicalTopology())
- .whenComplete((res, ex) -> {
- if (ex != null) {
- LOG.warn("Catalog compaction has failed
(timestamp={})", ex, lwm.longValue());
- } else if (LOG.isDebugEnabled()) {
- if (res) {
- LOG.debug("Catalog compaction completed
successfully (timestamp={})", lwm.longValue());
- } else {
- LOG.debug("Catalog compaction skipped
(timestamp={})", lwm.longValue());
- }
- }
- });
-
- lastRunFuture = fut;
-
- return fut;
+ lastRunFuture =
startCompaction(logicalTopologyService.localLogicalTopology());
+ }
});
}
- private CompletableFuture<Boolean> startCompaction(LogicalTopologySnapshot
topologySnapshot) {
+ private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot
topologySnapshot) {
long localMinimum = localMinTimeProvider.time();
- if (catalogByTsNullable(localMinimum) == null) {
- return CompletableFutures.falseCompletedFuture();
+ if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
+ LOG.info("Catalog compaction skipped, nothing to compact (ts={})",
localMinimum);
+
+ return CompletableFutures.nullCompletedFuture();
}
return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(),
localMinimum)
- .thenComposeAsync(ts -> {
- Catalog catalog = catalogByTsNullable(ts);
+ .thenComposeAsync(timeHolder -> {
+ long minRequiredTime = timeHolder.minRequiredTime;
+ long minActiveTxBeginTime =
timeHolder.minActiveTxBeginTime;
+ Catalog catalog =
catalogManagerFacade.catalogByTsNullable(minRequiredTime);
- if (catalog == null) {
- return CompletableFutures.falseCompletedFuture();
- }
+ CompletableFuture<Boolean> catalogCompactionFut;
- return requiredNodes(catalog)
- .thenCompose(requiredNodes -> {
- List<String> missingNodes =
missingNodes(requiredNodes, topologySnapshot.nodes());
-
- if (!missingNodes.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Catalog compaction aborted
due to missing cluster members (nodes={})", missingNodes);
- }
+ if (catalog == null) {
+ LOG.info("Catalog compaction skipped, nothing to
compact (ts={})", minRequiredTime);
- return
CompletableFutures.falseCompletedFuture();
+ catalogCompactionFut =
CompletableFutures.falseCompletedFuture();
+ } else {
+ catalogCompactionFut = tryCompactCatalog(catalog,
topologySnapshot).whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.warn("Catalog compaction has failed
(timestamp={})", ex, minRequiredTime);
+ } else {
+ if (res) {
+ LOG.info("Catalog compaction completed
successfully (timestamp={})", minRequiredTime);
+ } else {
+ LOG.info("Catalog compaction skipped
(timestamp={})", minRequiredTime);
}
+ }
+ });
+ }
- return
catalogManager.compactCatalog(catalog.time());
- });
+ CompletableFuture<Void> propagateToReplicasFut =
+ propagateTimeToReplicas(minActiveTxBeginTime,
topologySnapshot.nodes())
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.warn("Failed to propagate
minimum active tx begin time to replicas", ex);
+ }
+ });
+
+ return CompletableFuture.allOf(
+ catalogCompactionFut,
+ propagateToReplicasFut
+ );
}, executor);
}
- private @Nullable Catalog catalogByTsNullable(long ts) {
- try {
- int catalogVer = catalogManager.activeCatalogVersion(ts);
- return catalogManager.catalog(catalogVer - 1);
- } catch (IllegalStateException e) {
- return null;
- }
- }
-
- private CompletableFuture<Long>
determineGlobalMinimumRequiredTime(Set<LogicalNode> logicalTopologyNodes, long
localMinimumTs) {
- CatalogMinimumRequiredTimeRequest request =
MESSAGES_FACTORY.catalogMinimumRequiredTimeRequest().build();
- List<CompletableFuture<?>> ackFutures = new
ArrayList<>(logicalTopologyNodes.size());
- AtomicLong minimumRequiredTimeHolder = new AtomicLong(Long.MAX_VALUE);
+ CompletableFuture<TimeHolder> determineGlobalMinimumRequiredTime(
+ Collection<? extends ClusterNode> nodes,
+ long localMinimumRequiredTime
+ ) {
+ CatalogCompactionMinimumTimesRequest request =
COMPACTION_MESSAGES_FACTORY.catalogCompactionMinimumTimesRequest().build();
+ List<CompletableFuture<CatalogCompactionMinimumTimesResponse>>
responseFutures = new ArrayList<>(nodes.size() - 1);
- for (LogicalNode node : logicalTopologyNodes) {
+ for (ClusterNode node : nodes) {
if (localNodeName.equals(node.name())) {
continue;
}
- CompletableFuture<NetworkMessage> fut =
messagingService.invoke(node, request, ANSWER_TIMEOUT)
- .whenComplete((msg, ex) -> {
- if (ex != null) {
- return;
- }
+ CompletableFuture<CatalogCompactionMinimumTimesResponse> fut =
messagingService.invoke(node, request, ANSWER_TIMEOUT)
+
.thenApply(CatalogCompactionMinimumTimesResponse.class::cast);
- long time = ((CatalogMinimumRequiredTimeResponse)
msg).timestamp();
- long prevTime;
+ responseFutures.add(fut);
+ }
- // Update minimum timestamp.
- do {
- prevTime = minimumRequiredTimeHolder.get();
- } while (time < prevTime &&
!minimumRequiredTimeHolder.compareAndSet(prevTime, time));
- });
+ return CompletableFuture.allOf(responseFutures.toArray(new
CompletableFuture[0]))
+ .thenApply(ignore -> {
+ long globalMinimumRequiredTime = localMinimumRequiredTime;
+ long globalMinimumActiveTxTime =
activeLocalTxMinimumBeginTimeProvider.minimumBeginTime().longValue();
- ackFutures.add(fut);
- }
+ for
(CompletableFuture<CatalogCompactionMinimumTimesResponse> fut :
responseFutures) {
+ CatalogCompactionMinimumTimesResponse response =
fut.join();
+
+ if (response.minimumRequiredTime() <
globalMinimumRequiredTime) {
+ globalMinimumRequiredTime =
response.minimumRequiredTime();
+ }
- return CompletableFutures.allOf(ackFutures)
- .thenApply(ignore -> Math.min(minimumRequiredTimeHolder.get(),
localMinimumTs));
+ if (response.minimumActiveTxTime() <
globalMinimumActiveTxTime) {
+ globalMinimumActiveTxTime =
response.minimumActiveTxTime();
+ }
+ }
+
+ return new TimeHolder(globalMinimumRequiredTime,
globalMinimumActiveTxTime);
+ });
}
private long determineLocalMinimumRequiredTime() {
@@ -334,7 +371,7 @@ public class CatalogCompactionRunner implements
IgniteComponent {
int partitions = zone.partitions();
- List<ReplicationGroupId> replicationGroupIds = new
ArrayList<>(partitions);
+ List<TablePartitionId> replicationGroupIds = new
ArrayList<>(partitions);
for (int p = 0; p < partitions; p++) {
replicationGroupIds.add(new TablePartitionId(table.id(), p));
@@ -347,8 +384,7 @@ public class CatalogCompactionRunner implements
IgniteComponent {
for (int p = 0; p < partitions; p++) {
TokenizedAssignments assignment =
tokenizedAssignments.get(p);
if (assignment == null) {
- throw new IllegalStateException("Cannot get
assignments for table " + table.name()
- + " (replication group=" +
replicationGroupIds.get(p) + ").");
+
throwAssignmentsNotReadyException(replicationGroupIds.get(p));
}
assignment.nodes().forEach(a ->
required.add(a.consistentId()));
@@ -365,10 +401,123 @@ public class CatalogCompactionRunner implements
IgniteComponent {
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
- /** Minimum required time supplier. */
+ CompletableFuture<Void> propagateTimeToReplicas(long timestamp,
Collection<? extends ClusterNode> topologyNodes) {
+ HybridTimestamp nowTs = clockService.now();
+
+ return schemaSyncService.waitForMetadataCompleteness(nowTs)
+ .thenComposeAsync(ignore -> {
+ Map<Integer, Integer> tablesWithPartitions =
+
catalogManagerFacade.collectTablesWithPartitionsBetween(timestamp,
nowTs.longValue());
+
+ Set<String> topologyNodeNames = topologyNodes.stream()
+ .map(ClusterNode::name)
+ .collect(Collectors.toSet());
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22951
Minimize the number of network requests
+ return
CompletableFutures.allOf(tablesWithPartitions.entrySet().stream()
+ .map(e -> invokeOnReplicas(e.getKey(),
e.getValue(), timestamp, nowTs, topologyNodeNames))
+ .collect(Collectors.toList())
+ );
+ }, executor);
+ }
+
+ private CompletableFuture<Void> invokeOnReplicas(
+ int tableId,
+ int partitions,
+ long txBeginTime,
+ HybridTimestamp nowTs,
+ Set<String> logicalTopologyNodes
+ ) {
+ List<TablePartitionId> replicationGroupIds = new
ArrayList<>(partitions);
+
+ for (int p = 0; p < partitions; p++) {
+ replicationGroupIds.add(new TablePartitionId(tableId, p));
+ }
+
+ return placementDriver.getAssignments(replicationGroupIds, nowTs)
+ .thenComposeAsync(tokenizedAssignments -> {
+ assert tokenizedAssignments.size() ==
replicationGroupIds.size();
+
+ List<CompletableFuture<?>> replicaInvokeFutures = new
ArrayList<>(partitions);
+
+ for (int p = 0; p < partitions; p++) {
+ TablePartitionId replicationGroupId =
replicationGroupIds.get(p);
+ TokenizedAssignments tokenizedAssignment =
tokenizedAssignments.get(p);
+
+ if (tokenizedAssignment == null) {
+
throwAssignmentsNotReadyException(replicationGroupId);
+ }
+
+ Set<String> assignments =
tokenizedAssignment.nodes().stream()
+
.map(Assignment::consistentId).collect(Collectors.toSet());
+
+ String targetNodeName;
+
+ if (assignments.contains(localNodeName)) {
+ targetNodeName = localNodeName;
+ } else {
+ targetNodeName = assignments.stream()
+ .filter(logicalTopologyNodes::contains)
+ .findAny()
+ .orElseThrow(() -> new
IllegalStateException("Current topology doesn't include assignment nodes "
+ + "(assignments=" +
tokenizedAssignment.nodes()
+ + ", topology=" +
logicalTopologyNodes
+ + ", replication group=" +
replicationGroupId + ").")
+ );
+ }
+
+ TablePartitionIdMessage partIdMessage =
ReplicaMessageUtils.toTablePartitionIdMessage(
+ REPLICA_MESSAGES_FACTORY,
+ replicationGroupId
+ );
+
+ UpdateMinimumActiveTxBeginTimeReplicaRequest msg =
REPLICATION_MESSAGES_FACTORY
+ .updateMinimumActiveTxBeginTimeReplicaRequest()
+ .groupId(partIdMessage)
+ .timestamp(txBeginTime)
+ .build();
+
+
replicaInvokeFutures.add(replicaService.invoke(targetNodeName, msg));
+ }
+
+ return CompletableFutures.allOf(replicaInvokeFutures);
+ }, executor);
+ }
+
+ private CompletableFuture<Boolean> tryCompactCatalog(Catalog catalog,
LogicalTopologySnapshot topologySnapshot) {
+ return requiredNodes(catalog)
+ .thenCompose(requiredNodes -> {
+ List<String> missingNodes = missingNodes(requiredNodes,
topologySnapshot.nodes());
+
+ if (!missingNodes.isEmpty()) {
+ LOG.info("Catalog compaction aborted due to missing
cluster members (nodes={})", missingNodes);
+
+ return CompletableFutures.falseCompletedFuture();
+ }
+
+ return
catalogManagerFacade.compactCatalog(catalog.version());
+ });
+ }
+
+ private static void throwAssignmentsNotReadyException(TablePartitionId
replicationGroupId) {
+ throw new IllegalStateException("Cannot get assignments for table "
+ + "(replication group=" + replicationGroupId + ").");
+ }
+
+ /** Minimum required time provider. */
@FunctionalInterface
interface MinimumRequiredTimeProvider {
/** Returns minimum required timestamp. */
long time();
}
+
+ static class TimeHolder {
+ final long minRequiredTime;
+ final long minActiveTxBeginTime;
+
+ private TimeHolder(long minRequiredTime, long minActiveTxBeginTime) {
+ this.minRequiredTime = minRequiredTime;
+ this.minActiveTxBeginTime = minActiveTxBeginTime;
+ }
+ }
}
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
new file mode 100644
index 0000000000..613004d433
--- /dev/null
+++
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class contains utility methods for interacting with the catalog manager.
+ * These methods are only needed for catalog compaction routine.
+ */
+class CatalogManagerCompactionFacade {
+ private final CatalogManagerImpl catalogManager;
+
+ CatalogManagerCompactionFacade(CatalogManagerImpl catalogManager) {
+ this.catalogManager = catalogManager;
+ }
+
+ /**
+ * Scans catalog versions in a given time interval (including interval
boundaries).
+ * Extracts all tables contained in these catalog versions and creates a
mapping
+ * tableId -> number of partitions in this table.
+ *
+ * @param minTsInclusive Lower timestamp (inclusive).
+ * @param maxTsInclusive Upper timestamp (inclusive).
+ * @return Mapping tableId to number of partitions in this table.
+ */
+ Int2IntMap collectTablesWithPartitionsBetween(long minTsInclusive, long
maxTsInclusive) {
+ Int2IntMap tablesWithPartitions = new Int2IntOpenHashMap();
+ int curVer = catalogManager.activeCatalogVersion(minTsInclusive);
+ int lastVer = catalogManager.activeCatalogVersion(maxTsInclusive);
+
+ do {
+ Catalog catalog = catalogManager.catalog(curVer);
+
+ assert catalog != null : "ver=" + curVer + ", last=" + lastVer;
+
+ for (CatalogTableDescriptor table : catalog.tables()) {
+ CatalogZoneDescriptor zone = catalog.zone(table.zoneId());
+
+ assert zone != null : table.zoneId();
+
+ tablesWithPartitions.put(table.id(), zone.partitions());
+ }
+ } while (++curVer <= lastVer);
+
+ return tablesWithPartitions;
+ }
+
+ /**
+ * Returns the catalog version that is active at the given timestamp.
+ *
+ * @param timestamp Timestamp.
+ * @return Catalog or {@code null} if such version of the catalog doesn't
exist.
+ */
+ @Nullable Catalog catalogByTsNullable(long timestamp) {
+ try {
+ int catalogVer = catalogManager.activeCatalogVersion(timestamp);
+
+ return catalogManager.catalog(catalogVer - 1);
+ } catch (IllegalStateException ignore) {
+ return null;
+ }
+ }
+
+ CompletableFuture<Boolean> compactCatalog(int version) {
+ return catalogManager.compactCatalog(version);
+ }
+}
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMessageGroup.java
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMessageGroup.java
index b1c74a5b51..6e828e712d 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMessageGroup.java
+++
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMessageGroup.java
@@ -26,9 +26,9 @@ import
org.apache.ignite.internal.network.annotations.MessageGroup;
public class CatalogCompactionMessageGroup {
public static final short GROUP_TYPE = 14;
- /** See {@link CatalogMinimumRequiredTimeRequest} for the details. */
- public static final short MINIMUM_REQUIRED_TIME_REQUEST = 0;
+ /** See {@link CatalogCompactionMinimumTimesRequest} for the details. */
+ public static final short MINIMUM_TIMES_REQUEST = 0;
- /** See {@link CatalogMinimumRequiredTimeResponse} for the details. */
- public static final short MINIMUM_REQUIRED_TIME_RESPONSE = 1;
+ /** See {@link CatalogCompactionMinimumTimesResponse} for the details. */
+ public static final short MINIMUM_TIMES_RESPONSE = 1;
}
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesRequest.java
similarity index 77%
copy from
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
copy to
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesRequest.java
index d1156a3736..3177d614cd 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
+++
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesRequest.java
@@ -21,10 +21,11 @@ import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
/**
- * Request to obtain the minimum timestamp to which, from the local
- * node's perspective, the catalog history can be safely truncated.
+ * Request to obtain timestamps required to safely perform catalog compaction.
+ *
+ * @see CatalogCompactionMinimumTimesResponse
*/
-@Transferable(CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_REQUEST)
-public interface CatalogMinimumRequiredTimeRequest extends NetworkMessage {
+@Transferable(CatalogCompactionMessageGroup.MINIMUM_TIMES_REQUEST)
+public interface CatalogCompactionMinimumTimesRequest extends NetworkMessage {
}
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesResponse.java
similarity index 60%
copy from
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
copy to
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesResponse.java
index b3760e9d15..a3c1eee692 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
+++
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesResponse.java
@@ -21,11 +21,19 @@ import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
/**
- * Response message containing the low watermark required for the local node.
- * This watermark is used to safely truncate catalog history.
+ * Response message containing timestamps from the entire cluster required to
safely perform catalog compaction.
+ *
+ * <p>This includes the following:
+ * <ol>
+ * <li>the minimum timestamp to which, from the local node's perspective,
the catalog history can be safely truncated</li>
+ * <li>the minimum starting time among locally started active RW
transactions</li>
+ * </ol>
*/
-@Transferable(CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_RESPONSE)
-public interface CatalogMinimumRequiredTimeResponse extends NetworkMessage {
+@Transferable(CatalogCompactionMessageGroup.MINIMUM_TIMES_RESPONSE)
+public interface CatalogCompactionMinimumTimesResponse extends NetworkMessage {
/** Returns node's minimum required time. */
- long timestamp();
+ long minimumRequiredTime();
+
+ /** Returns node's minimum starting time among locally started active RW
transactions. */
+ long minimumActiveTxTime();
}
diff --git
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/AbstractCatalogCompactionTest.java
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/AbstractCatalogCompactionTest.java
new file mode 100644
index 0000000000..13cc7bfdd4
--- /dev/null
+++
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/AbstractCatalogCompactionTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import static
org.apache.ignite.internal.catalog.CatalogTestUtils.awaitDefaultZoneCreation;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.ClockWaiter;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.manager.ComponentContext;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+
+/** Base class for catalog compaction unit testing. */
+abstract class AbstractCatalogCompactionTest extends BaseIgniteAbstractTest {
+ private final HybridClock clock = new HybridClockImpl();
+
+ private final ClockWaiter clockWaiter = new ClockWaiter("test-node",
clock);
+
+ final ClockService clockService = new TestClockService(clock, clockWaiter);
+
+ CatalogManagerImpl catalogManager;
+
+ @BeforeEach
+ void setUp() {
+ catalogManager = createCatalogManager("test-node");
+ }
+
+ /** Creates catalog manager. */
+ private CatalogManagerImpl createCatalogManager(String nodeName) {
+ StandaloneMetaStorageManager metastore =
StandaloneMetaStorageManager.create(new
SimpleInMemoryKeyValueStorage(nodeName));
+ CatalogManagerImpl manager = new CatalogManagerImpl(new
UpdateLogImpl(metastore), clockService);
+
+ assertThat(startAsync(new ComponentContext(), metastore, clockWaiter,
manager), willCompleteSuccessfully());
+ assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
+ awaitDefaultZoneCreation(manager);
+
+ return manager;
+ }
+}
diff --git
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
index bb56812acf..49910f3259 100644
---
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
+++
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
@@ -17,14 +17,13 @@
package org.apache.ignite.internal.catalog.compaction;
-import static
org.apache.ignite.internal.catalog.CatalogTestUtils.awaitDefaultZoneCreation;
import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -35,67 +34,67 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
import org.apache.ignite.internal.catalog.Catalog;
-import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.CatalogTestUtils.TestCommand;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
import org.apache.ignite.internal.catalog.commands.CreateTableCommandBuilder;
import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
import
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMessagesFactory;
-import
org.apache.ignite.internal.catalog.compaction.message.CatalogMinimumRequiredTimeRequest;
-import
org.apache.ignite.internal.catalog.compaction.message.CatalogMinimumRequiredTimeResponse;
-import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
+import
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMinimumTimesRequest;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.hlc.ClockService;
-import org.apache.ignite.internal.hlc.ClockWaiter;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.manager.ComponentContext;
-import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
-import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Tests for class {@link CatalogCompactionRunner}.
*/
-public class CatalogCompactionRunnerSelfTest extends BaseIgniteAbstractTest {
+public class CatalogCompactionRunnerSelfTest extends
AbstractCatalogCompactionTest {
private static final LogicalNode NODE1 = new LogicalNode("1", "node1", new
NetworkAddress("localhost", 123));
private static final LogicalNode NODE2 = new LogicalNode("2", "node2", new
NetworkAddress("localhost", 123));
@@ -104,9 +103,7 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
private static final List<LogicalNode> logicalNodes = List.of(NODE1,
NODE2, NODE3);
- private final ClockService clockService = new TestClockService(new
HybridClockImpl());
-
- private CatalogManagerImpl catalogManager;
+ private final AtomicReference<ClusterNode> coordinatorNodeHolder = new
AtomicReference<>();
private LogicalTopologyService logicalTopologyService;
@@ -114,12 +111,7 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
private PlacementDriver placementDriver;
- @BeforeEach
- void setup() {
- HybridClock clock = new HybridClockImpl();
-
- catalogManager = createCatalogManager(clock);
- }
+ private ReplicaService replicaService;
@Test
public void routineSucceedOnCoordinator() throws InterruptedException {
@@ -147,7 +139,7 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
CatalogCompactionRunner compactionRunner = createRunner(NODE1, NODE1,
nodeToTime::get);
assertThat(compactionRunner.onLowWatermarkChanged(clockService.now()),
willBe(false));
- assertThat(compactionRunner.lastRunFuture(), willBe(true));
+ assertThat(compactionRunner.lastRunFuture(),
willCompleteSuccessfully());
int expectedEarliestCatalogVersion = catalog1.version() - 1;
@@ -156,25 +148,30 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
verify(messagingService, times(logicalNodes.size() -
1)).invoke(any(ClusterNode.class), any(NetworkMessage.class), anyLong());
// Nothing should be changed if catalog already compacted for previous
timestamp.
- assertThat(compactionRunner.triggerCompaction(clockService.now()),
willBe(false));
+ compactionRunner.triggerCompaction(clockService.now());
+ assertThat(compactionRunner.lastRunFuture(),
willCompleteSuccessfully());
+ assertEquals(expectedEarliestCatalogVersion,
catalogManager.earliestCatalogVersion());
// Nothing should be changed if previous catalog doesn't exists.
Catalog earliestCatalog =
Objects.requireNonNull(catalogManager.catalog(catalogManager.earliestCatalogVersion()));
compactionRunner = createRunner(NODE1, NODE1, (n) ->
earliestCatalog.time());
- assertThat(compactionRunner.triggerCompaction(clockService.now()),
willBe(false));
+ compactionRunner.triggerCompaction(clockService.now());
+ assertThat(compactionRunner.lastRunFuture(),
willCompleteSuccessfully());
verify(messagingService, times(0)).invoke(any(ClusterNode.class),
any(NetworkMessage.class), anyLong());
}
@Test
public void mustNotStartOnNonCoordinator() {
- CatalogCompactionRunner compactor = createRunner(NODE1, NODE3, ignore
-> 0L);
+ assertThat(catalogManager.execute(TestCommand.ok()),
willCompleteSuccessfully());
+ CatalogCompactionRunner compactor = createRunner(NODE1, NODE3, ignore
-> clockService.nowLong());
- CompletableFuture<Boolean> lastRunFuture = compactor.lastRunFuture();
+ CompletableFuture<Void> lastRunFuture = compactor.lastRunFuture();
assertThat(compactor.onLowWatermarkChanged(clockService.now()),
willBe(false));
assertThat(compactor.lastRunFuture(), is(lastRunFuture));
// Changing the coordinator should trigger compaction.
+ coordinatorNodeHolder.set(NODE1);
compactor.updateCoordinator(NODE1);
assertThat(compactor.lastRunFuture(), is(not(lastRunFuture)));
}
@@ -187,27 +184,13 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
CatalogCompactionRunner compactor =
createRunner(NODE1, NODE1, (n) -> earliestCatalog.time() - 1,
logicalNodes, logicalNodes);
- assertThat(compactor.triggerCompaction(clockService.now()),
willBe(false));
+ compactor.triggerCompaction(clockService.now());
+ assertThat(compactor.lastRunFuture(), willCompleteSuccessfully());
}
@Test
public void mustNotPerformWhenAssignmentNodeIsMissing() throws
InterruptedException {
- CreateTableCommandBuilder tabBuilder = CreateTableCommand.builder()
- .tableName("test")
- .schemaName("PUBLIC")
- .columns(List.of(columnParams("key1", INT32),
columnParams("key2", INT32), columnParams("val", INT32, true)))
-
.primaryKey(TableHashPrimaryKey.builder().columns(List.of("key1",
"key2")).build())
- .colocationColumns(List.of("key2"));
-
- assertThat(catalogManager.execute(TestCommand.ok()),
willCompleteSuccessfully());
-
assertThat(catalogManager.execute(tabBuilder.tableName("test1").build()),
willCompleteSuccessfully());
-
assertThat(catalogManager.execute(tabBuilder.tableName("test2").build()),
willCompleteSuccessfully());
-
assertThat(catalogManager.execute(tabBuilder.tableName("test3").build()),
willCompleteSuccessfully());
- assertThat(catalogManager.execute(TestCommand.ok()),
willCompleteSuccessfully());
-
- Catalog catalog =
catalogManager.catalog(catalogManager.activeCatalogVersion(clockService.nowLong()));
-
- assertNotNull(catalog);
+ Catalog catalog = prepareCatalogWithTables();
// Node NODE3 from the assignment is missing in logical topology.
{
@@ -219,7 +202,9 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
List.of(NODE1, NODE2, NODE3)
);
- assertThat(compactor.triggerCompaction(clockService.now()),
willBe(false));
+ compactor.triggerCompaction(clockService.now());
+ assertThat(compactor.lastRunFuture(), willCompleteSuccessfully());
+ assertThat(catalogManager.earliestCatalogVersion(), is(0));
}
// Node NODE3 from the assignment is missing in logical topology, but
topology changes during messaging.
@@ -245,8 +230,12 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
logicalNodes
);
- CompletableFuture<CompletableFuture<Boolean>> fut =
IgniteTestUtils.runAsync(
- () -> compactor.triggerCompaction(clockService.now()));
+ CompletableFuture<CompletableFuture<Void>> fut =
IgniteTestUtils.runAsync(
+ () -> {
+ compactor.triggerCompaction(clockService.now());
+
+ return compactor.lastRunFuture();
+ });
assertTrue(messageBlockLatch.await(5, TimeUnit.SECONDS));
@@ -263,7 +252,7 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
// Since we do not know the minimum required time by NODE3,
despite the fact
// that all the necessary nodes are in the logical topology at the
time
// assignments are collected, we cannot perform catalog compaction.
- assertThat(fut.join(), willBe(false));
+ assertThat(catalogManager.earliestCatalogVersion(), is(0));
}
// All nodes from the assignments are present in logical topology.
@@ -276,7 +265,11 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
logicalNodes
);
- assertThat(compactor.triggerCompaction(clockService.now()),
willBe(true));
+ compactor.triggerCompaction(clockService.now());
+ assertThat(compactor.lastRunFuture(), willCompleteSuccessfully());
+ waitForCondition(() -> catalogManager.earliestCatalogVersion() !=
0, 1_000);
+
+ assertThat(catalogManager.earliestCatalogVersion(),
is(catalog.version() - 1));
}
}
@@ -292,9 +285,10 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
};
CatalogCompactionRunner compactor = createRunner(NODE1, NODE1,
timeSupplier);
+ compactor.triggerCompaction(clockService.now());
ExecutionException ex =
Assertions.assertThrows(ExecutionException.class,
- () -> compactor.triggerCompaction(clockService.now()).get());
+ () -> compactor.lastRunFuture().get());
assertThat(ex.getCause(), instanceOf(expected.getClass()));
assertThat(ex.getCause().getMessage(), equalTo(expected.getMessage()));
@@ -303,7 +297,7 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
@Test
public void compactionAbortedIfAssignmentsNotAvailableForTable() {
- CreateTableCommandBuilder tabBuilder = CreateTableCommand.builder()
+ CreateTableCommandBuilder tableCmdBuilder =
CreateTableCommand.builder()
.tableName("test")
.schemaName("PUBLIC")
.columns(List.of(columnParams("key1", INT32),
columnParams("key2", INT32), columnParams("val", INT32, true)))
@@ -311,7 +305,7 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
.colocationColumns(List.of("key2"));
assertThat(catalogManager.execute(TestCommand.ok()),
willCompleteSuccessfully());
-
assertThat(catalogManager.execute(tabBuilder.tableName("test1").build()),
willCompleteSuccessfully());
+ assertThat(catalogManager.execute(tableCmdBuilder.build()),
willCompleteSuccessfully());
assertThat(catalogManager.execute(TestCommand.ok()),
willCompleteSuccessfully());
Catalog catalog =
catalogManager.catalog(catalogManager.activeCatalogVersion(clockService.nowLong()));
@@ -326,12 +320,133 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
);
when(placementDriver.getAssignments(any(List.class),
any())).thenReturn(CompletableFuture.failedFuture(new ArithmeticException()));
- assertThat(compactor.triggerCompaction(clockService.now()),
willThrow(ArithmeticException.class));
+ compactor.triggerCompaction(clockService.now());
+ assertThat(compactor.lastRunFuture(),
willThrow(ArithmeticException.class));
List<?> assignments = IntStream.range(0,
CatalogUtils.DEFAULT_PARTITION_COUNT).mapToObj(i ->
null).collect(Collectors.toList());
when(placementDriver.getAssignments(any(List.class),
any())).thenReturn(CompletableFuture.completedFuture(assignments));
- assertThat(compactor.triggerCompaction(clockService.now()),
willThrow(IllegalStateException.class));
+ compactor.triggerCompaction(clockService.now());
+ assertThat(compactor.lastRunFuture(),
willThrow(IllegalStateException.class));
+ }
+
+ @Test
+ public void shouldNotStartIfAlreadyInProgress() throws
InterruptedException {
+ assertThat(catalogManager.execute(TestCommand.ok()),
willCompleteSuccessfully());
+
+ CountDownLatch messageBlockLatch = new CountDownLatch(1);
+ CountDownLatch topologyChangeLatch = new CountDownLatch(1);
+
+ CatalogCompactionRunner compactor = createRunner(
+ NODE1,
+ NODE1,
+ (node) -> {
+ if (NODE1.name().equals(node)) {
+ return clockService.nowLong();
+ }
+
+ try {
+ messageBlockLatch.countDown();
+
+ topologyChangeLatch.await();
+
+ return Long.MIN_VALUE;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+
+ compactor.triggerCompaction(clockService.now());
+
+ messageBlockLatch.await();
+
+ CompletableFuture<Void> lastFut = compactor.lastRunFuture();
+
+ compactor.triggerCompaction(clockService.now());
+
+ assertSame(lastFut, compactor.lastRunFuture());
+
+ topologyChangeLatch.countDown();
+
+ assertThat(compactor.lastRunFuture(), willCompleteSuccessfully());
+ }
+
+ @Test
+ public void minTxTimePropagation() {
+ Catalog catalog = prepareCatalogWithTables();
+
+ List<LogicalNode> logicalTopology = List.of(NODE2, NODE3, NODE1);
+ List<LogicalNode> assignments = List.of(NODE3, NODE2, NODE1);
+ CatalogCompactionRunner compactor = createRunner(NODE1, NODE1, (n) ->
catalog.time(), logicalTopology, assignments);
+
+ assertThat(compactor.propagateTimeToReplicas(catalog.time(),
logicalTopology), willCompleteSuccessfully());
+
+ // All invocations must be made locally, since coordinator is present
in assignments for all tables.
+ verify(replicaService, times(0)).invoke(eq(NODE2.name()),
any(ReplicaRequest.class));
+ verify(replicaService, times(0)).invoke(eq(NODE3.name()),
any(ReplicaRequest.class));
+ verify(replicaService, times(/* tables */ 3 * /* partitions */
25)).invoke(eq(NODE1.name()), any(ReplicaRequest.class));
+ }
+
+ @Test
+ public void minTxTimePropagationSucceedWhenSomeAssignmentIsMissing() {
+ Catalog catalog = prepareCatalogWithTables();
+
+ {
+ List<LogicalNode> logicalTopology = List.of(NODE2, NODE1);
+ List<LogicalNode> assignments = List.of(NODE3, NODE2, NODE1);
+ CatalogCompactionRunner compactor = createRunner(NODE1, NODE1, (n)
-> catalog.time(), logicalTopology, assignments);
+
+ assertThat(compactor.propagateTimeToReplicas(catalog.time(),
logicalTopology), willCompleteSuccessfully());
+ verify(replicaService, times(0)).invoke(eq(NODE2.name()),
any(ReplicaRequest.class));
+ verify(replicaService, times(0)).invoke(eq(NODE3.name()),
any(ReplicaRequest.class));
+ verify(replicaService, times(/* tables */ 3 * /* partitions */
25)).invoke(eq(NODE1.name()), any(ReplicaRequest.class));
+ clearInvocations(replicaService);
+ }
+
+ {
+ List<LogicalNode> logicalTopology = List.of(NODE2, NODE1);
+ List<LogicalNode> assignments = List.of(NODE3, NODE2);
+ CatalogCompactionRunner compactor = createRunner(NODE1, NODE1, (n)
-> catalog.time(), logicalTopology, assignments);
+
+ assertThat(compactor.propagateTimeToReplicas(catalog.time(),
logicalTopology), willCompleteSuccessfully());
+ verify(replicaService, times(0)).invoke(eq(NODE1.name()),
any(ReplicaRequest.class));
+ verify(replicaService, times(0)).invoke(eq(NODE3.name()),
any(ReplicaRequest.class));
+ verify(replicaService, times(/* tables */ 3 * /* partitions */
25)).invoke(eq(NODE2.name()), any(ReplicaRequest.class));
+ }
+ }
+
+ @Test
+ public void minTxTimePropagationAbortedIfNoAssignmentsPresentInTopology() {
+ Catalog catalog = prepareCatalogWithTables();
+
+ List<LogicalNode> logicalTopology = List.of(NODE1, NODE2);
+ List<LogicalNode> assignments = List.of(NODE3);
+
+ CatalogCompactionRunner compactor = createRunner(NODE1, NODE1, (n) ->
catalog.time(), logicalTopology, assignments);
+
+ CompletableFuture<Void> fut =
compactor.propagateTimeToReplicas(catalog.time(), logicalTopology);
+
+ //noinspection ThrowableNotThrown
+ assertThrows(IllegalStateException.class, () -> await(fut), "Current
topology doesn't include assignment nodes");
+ }
+
+ private Catalog prepareCatalogWithTables() {
+ CreateTableCommandBuilder tableCmdBuilder =
CreateTableCommand.builder()
+ .schemaName("PUBLIC")
+ .columns(List.of(columnParams("key1", INT32),
columnParams("key2", INT32), columnParams("val", INT32, true)))
+
.primaryKey(TableHashPrimaryKey.builder().columns(List.of("key1",
"key2")).build())
+ .colocationColumns(List.of("key2"));
+
+ assertThat(catalogManager.execute(TestCommand.ok()),
willCompleteSuccessfully());
+
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test1").build()),
willCompleteSuccessfully());
+
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test2").build()),
willCompleteSuccessfully());
+
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test3").build()),
willCompleteSuccessfully());
+ assertThat(catalogManager.execute(TestCommand.ok()),
willCompleteSuccessfully());
+
+ Catalog catalog =
catalogManager.catalog(catalogManager.activeCatalogVersion(clockService.nowLong()));
+
+ return Objects.requireNonNull(catalog);
}
private CatalogCompactionRunner createRunner(
@@ -349,33 +464,40 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
List<LogicalNode> topology,
List<LogicalNode> assignmentNodes
) {
+ coordinatorNodeHolder.set(coordinator);
messagingService = mock(MessagingService.class);
logicalTopologyService = mock(LogicalTopologyService.class);
placementDriver = mock(PlacementDriver.class);
+ replicaService = mock(ReplicaService.class);
+ SchemaSyncService schemaSyncService = mock(SchemaSyncService.class);
+
CatalogCompactionMessagesFactory messagesFactory = new
CatalogCompactionMessagesFactory();
- when(messagingService.invoke(any(ClusterNode.class),
any(CatalogMinimumRequiredTimeRequest.class), anyLong()))
+ when(messagingService.invoke(any(ClusterNode.class),
any(CatalogCompactionMinimumTimesRequest.class), anyLong()))
.thenAnswer(invocation -> {
- String nodeName = ((ClusterNode)
invocation.getArgument(0)).name();
+ return CompletableFuture.supplyAsync(() -> {
+ String nodeName = ((ClusterNode)
invocation.getArgument(0)).name();
- assertThat("Coordinator shouldn't send messages to
himself", nodeName, not(Matchers.equalTo(coordinator.name())));
+ assertThat("Coordinator shouldn't send messages to
himself",
+ nodeName,
not(Matchers.equalTo(coordinatorNodeHolder.get().name())));
- Object obj = timeSupplier.apply(nodeName);
-
- // Simulate an exception when exchanging messages.
- if (obj instanceof Exception) {
- return CompletableFuture.failedFuture((Exception) obj);
- }
+ Object obj = timeSupplier.apply(nodeName);
- CatalogMinimumRequiredTimeResponse msg =
messagesFactory.catalogMinimumRequiredTimeResponse()
- .timestamp(((Long) obj)).build();
+ // Simulate an exception when exchanging messages.
+ if (obj instanceof Exception) {
+ throw new CompletionException((Exception) obj);
+ }
- return CompletableFuture.completedFuture(msg);
+ return
messagesFactory.catalogCompactionMinimumTimesResponse()
+ .minimumRequiredTime(((Long) obj))
+ .minimumActiveTxTime(clockService.nowLong())
+ .build();
+ });
});
Set<Assignment> assignments = assignmentNodes.stream()
.map(node -> Assignment.forPeer(node.name()))
- .collect(Collectors.toSet());
+ .collect(Collectors.toCollection(LinkedHashSet::new));
List<?> tableAssignments = IntStream.range(0,
CatalogUtils.DEFAULT_PARTITION_COUNT)
.mapToObj(i -> new TokenizedAssignmentsImpl(assignments,
Long.MAX_VALUE))
@@ -387,14 +509,33 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
when(logicalTopologyService.localLogicalTopology()).thenReturn(logicalTop);
+ Set<String> logicalNodeNames =
topology.stream().map(ClusterNodeImpl::name).collect(Collectors.toSet());
+
+ when(replicaService.invoke(any(String.class),
any(ReplicaRequest.class)))
+ .thenAnswer(invocation ->
+ CompletableFuture.supplyAsync(() -> {
+ String nodeName = invocation.getArgument(0);
+
+ if (!logicalNodeNames.contains(nodeName)) {
+ throw new
UnresolvableConsistentIdException(nodeName);
+ }
+
+ return null;
+ }));
+
+
when(schemaSyncService.waitForMetadataCompleteness(any())).thenReturn(CompletableFutures.nullCompletedFuture());
+
CatalogCompactionRunner runner = new CatalogCompactionRunner(
localNode.name(),
catalogManager,
messagingService,
logicalTopologyService,
placementDriver,
+ replicaService,
clockService,
+ schemaSyncService,
ForkJoinPool.commonPool(),
+ clockService::now,
() -> (Long) timeSupplier.apply(coordinator.name())
);
@@ -404,18 +545,4 @@ public class CatalogCompactionRunnerSelfTest extends
BaseIgniteAbstractTest {
return runner;
}
-
- private static CatalogManagerImpl createCatalogManager(HybridClock clock) {
- StandaloneMetaStorageManager metastore =
StandaloneMetaStorageManager.create(new
SimpleInMemoryKeyValueStorage(NODE1.name()));
- ClockWaiter clockWaiter = new ClockWaiter(NODE1.name(), clock);
- TestClockService clockService = new TestClockService(clock,
clockWaiter);
-
- CatalogManagerImpl manager = new CatalogManagerImpl(new
UpdateLogImpl(metastore), clockService);
-
- assertThat(startAsync(new ComponentContext(), metastore, clockWaiter,
manager), willCompleteSuccessfully());
- assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
- awaitDefaultZoneCreation(manager);
-
- return manager;
- }
}
diff --git
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
new file mode 100644
index 0000000000..5072e0200d
--- /dev/null
+++
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
+import org.apache.ignite.internal.catalog.commands.CreateTableCommandBuilder;
+import org.apache.ignite.internal.catalog.commands.DropTableCommand;
+import org.apache.ignite.internal.catalog.commands.DropTableCommandBuilder;
+import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link CatalogManagerCompactionFacade}.
+ */
+class CatalogManagerCompactionFacadeTest extends AbstractCatalogCompactionTest
{
+ private CatalogManagerCompactionFacade catalogManagerFacade;
+
+ @BeforeEach
+ void setupHelper() {
+ catalogManagerFacade = new
CatalogManagerCompactionFacade(catalogManager);
+ }
+
+ @Test
+ void testCollectTablesWithPartitionsBetween() {
+ CreateTableCommandBuilder tableCmdBuilder =
CreateTableCommand.builder()
+ .schemaName("PUBLIC")
+ .columns(List.of(columnParams("key1", INT32),
columnParams("key2", INT32), columnParams("val", INT32, true)))
+
.primaryKey(TableHashPrimaryKey.builder().columns(List.of("key1",
"key2")).build())
+ .colocationColumns(List.of("key2"));
+
+ DropTableCommandBuilder dropTableCommandBuilder =
DropTableCommand.builder()
+ .schemaName("PUBLIC");
+
+ long from1 = clockService.nowLong();
+
+
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test1").build()),
willCompleteSuccessfully());
+
assertThat(catalogManager.execute(dropTableCommandBuilder.tableName("test1").build()),
willCompleteSuccessfully());
+
+ long from2 = clockService.nowLong();
+
+
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test2").build()),
willCompleteSuccessfully());
+
assertThat(catalogManager.execute(dropTableCommandBuilder.tableName("test2").build()),
willCompleteSuccessfully());
+
+ long from3 = clockService.nowLong();
+
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test3").build()),
willCompleteSuccessfully());
+
assertThat(catalogManager.execute(dropTableCommandBuilder.tableName("test3").build()),
willCompleteSuccessfully());
+
+ {
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(from1,
+ clockService.nowLong());
+
+ assertThat(tablesWithParts.keySet(), hasSize(3));
+ }
+
+ {
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(from2,
+ clockService.nowLong());
+
+ assertThat(tablesWithParts.keySet(), hasSize(2));
+ }
+
+ {
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(from3,
+ clockService.nowLong());
+
+ assertThat(tablesWithParts.keySet(), hasSize(1));
+ }
+
+ {
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ clockService.nowLong(),
+ clockService.nowLong()
+ );
+
+ assertThat(tablesWithParts.keySet(), hasSize(0));
+ }
+ }
+
+ @Test
+ void testCatalogByTsNullable() {
+ Catalog earliestCatalog =
catalogManager.catalog(catalogManager.earliestCatalogVersion());
+ assertNotNull(earliestCatalog);
+
+
assertNull(catalogManagerFacade.catalogByTsNullable(earliestCatalog.time() -
1));
+ }
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 70c086c5b7..5865e5f00e 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -345,12 +345,16 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
/**
* Cleanup outdated catalog versions, which can't be observed after given
timestamp (inclusively), and compact underlying update log.
*
- * @param timestamp Earliest observable timestamp.
+ * @param version Earliest observable version.
* @return Operation future, which is completing with {@code true} if a
new snapshot has been successfully written, {@code false}
* otherwise if a snapshot with the same or greater version
already exists.
*/
- public CompletableFuture<Boolean> compactCatalog(long timestamp) {
- Catalog catalog = catalogAt(timestamp);
+ public CompletableFuture<Boolean> compactCatalog(int version) {
+ Catalog catalog = catalog(version);
+
+ if (catalog == null) {
+ throw new IllegalArgumentException("Catalog version not found: " +
version);
+ }
return updateLog.saveSnapshot(new SnapshotEntry(catalog));
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
index efb5574a85..1ae5d4d8f3 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
@@ -151,7 +151,6 @@ public class CatalogManagerRecoveryTest extends
BaseIgniteAbstractTest {
// Save version, which will be earliest version after snapshot.
int earliestVersion = catalogManager.latestCatalogVersion();
long earliestVersionActivationTime =
catalogManager.catalog(earliestVersion).time();
- long snapshotTime = clock.nowLong();
assertThat(catalogManager.execute(simpleTable(TABLE_NAME_3)),
willCompleteSuccessfully());
@@ -162,7 +161,7 @@ public class CatalogManagerRecoveryTest extends
BaseIgniteAbstractTest {
clearInvocations(interceptor);
interceptor.dropSnapshotEvents();
- assertThat(((CatalogManagerImpl)
catalogManager).compactCatalog(snapshotTime), willBe(true));
+ assertThat(((CatalogManagerImpl)
catalogManager).compactCatalog(earliestVersion), willBe(true));
verify(interceptor, timeout(2_000)).handle(any(SnapshotEntry.class),
any(), anyLong());
assertThat(catalogManager.earliestCatalogVersion(), equalTo(0));
@@ -181,7 +180,6 @@ public class CatalogManagerRecoveryTest extends
BaseIgniteAbstractTest {
assertThrows(IllegalStateException.class, () ->
catalogManager.activeCatalogVersion(0));
assertThrows(IllegalStateException.class, () ->
catalogManager.activeCatalogVersion(earliestVersionActivationTime - 1));
assertThat(catalogManager.activeCatalogVersion(earliestVersionActivationTime),
equalTo(earliestVersion));
- assertThat(catalogManager.activeCatalogVersion(snapshotTime),
equalTo(earliestVersion));
assertThat(catalogManager.activeCatalogVersion(latestVersionActivationTime),
equalTo(latestVersion));
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 1b45d27895..ea038e7d79 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -427,14 +427,14 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
assertThat(manager.execute(TestCommand.ok()),
willCompleteSuccessfully());
assertThat(manager.execute(TestCommand.ok()),
willCompleteSuccessfully());
- long timestamp = clock.nowLong();
+ int compactToVer = manager.latestCatalogVersion();
Catalog catalog =
manager.catalog(manager.activeCatalogVersion(clock.nowLong()));
// Add more updates
assertThat(manager.execute(TestCommand.ok()),
willCompleteSuccessfully());
assertThat(manager.execute(TestCommand.ok()),
willCompleteSuccessfully());
- assertThat(manager.compactCatalog(timestamp), willBe(Boolean.TRUE));
+ assertThat(manager.compactCatalog(compactToVer), willBe(Boolean.TRUE));
assertTrue(waitForCondition(() -> catalog.version() ==
manager.earliestCatalogVersion(), 3_000));
assertNull(manager.catalog(0));
@@ -444,9 +444,9 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
assertThrows(IllegalStateException.class, () ->
manager.activeCatalogVersion(0));
assertThrows(IllegalStateException.class, () ->
manager.activeCatalogVersion(catalog.time() - 1));
assertSame(catalog.version(),
manager.activeCatalogVersion(catalog.time()));
- assertSame(catalog.version(), manager.activeCatalogVersion(timestamp));
+ assertSame(catalog.version(), compactToVer);
- assertThat(manager.compactCatalog(timestamp), willBe(false));
+ assertThat(manager.compactCatalog(compactToVer), willBe(false));
assertEquals(catalog.version(), manager.earliestCatalogVersion());
}
@@ -454,9 +454,7 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
public void testEmptyCatalogCompaction() {
assertEquals(1, manager.latestCatalogVersion());
- long timestamp = clock.nowLong();
-
- assertThat(manager.compactCatalog(timestamp), willBe(false));
+ assertThat(manager.compactCatalog(1), willBe(false));
assertEquals(0, manager.earliestCatalogVersion());
assertEquals(1, manager.latestCatalogVersion());
@@ -464,6 +462,5 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
assertNotNull(manager.catalog(1));
assertEquals(0, manager.activeCatalogVersion(0));
- assertEquals(1, manager.activeCatalogVersion(timestamp));
}
}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
index 9a3c59c6c6..301a0f0f48 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageHandler;
+import org.apache.ignite.internal.tx.ActiveLocalTxMinimumBeginTimeProvider;
import org.apache.ignite.internal.tx.LocalRwTxCounter;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
@@ -49,7 +50,7 @@ import org.jetbrains.annotations.Nullable;
* Local node RW transaction completion checker for indexes. Main task is to
handle the
* {@link IsNodeFinishedRwTransactionsStartedBeforeRequest}.
*/
-public class IndexNodeFinishedRwTransactionsChecker implements
LocalRwTxCounter, IgniteComponent {
+public class IndexNodeFinishedRwTransactionsChecker implements
LocalRwTxCounter, ActiveLocalTxMinimumBeginTimeProvider, IgniteComponent {
private static final IndexMessagesFactory FACTORY = new
IndexMessagesFactory();
private final ReentrantReadWriteLock readWriteLock = new
ReentrantReadWriteLock();
@@ -146,6 +147,20 @@ public class IndexNodeFinishedRwTransactionsChecker
implements LocalRwTxCounter,
});
}
+ @Override
+ public HybridTimestamp minimumBeginTime() {
+ readWriteLock.writeLock().lock();
+
+ try {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22975 Improve
minimum begin time determination
+ return txCatalogVersionByBeginTxTs.keySet().stream()
+ .min(HybridTimestamp::compareTo)
+ .orElse(clock.now());
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
/**
* Handles {@link IsNodeFinishedRwTransactionsStartedBeforeRequest} of
{@link IndexMessageGroup}.
*
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 40e4987556..52e815f2a8 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -25,6 +25,7 @@ import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxC
import
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
@@ -54,6 +55,7 @@ import
org.apache.ignite.internal.partition.replicator.network.replication.ReadW
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSwapRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
+import
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
/**
* Message group for the table module.
@@ -191,6 +193,11 @@ public interface PartitionReplicationMessageGroup {
*/
short GET_ESTIMATED_SIZE_MESSAGE = 25;
+ /**
+ * Message type for {@link UpdateMinimumActiveTxBeginTimeReplicaRequest}.
+ */
+ short UPDATE_MINIMUM_ACTIVE_TX_TIME_REPLICA_REQUEST = 26;
+
/**
* Message types for partition replicator module RAFT commands.
*
@@ -211,6 +218,9 @@ public interface PartitionReplicationMessageGroup {
/** Message type for {@link BuildIndexCommand}. */
short BUILD_INDEX = 44;
+
+ /** Message type for {@link UpdateMinimumActiveTxBeginTimeCommand}. */
+ short UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND = 45;
}
/**
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateMinimumActiveTxBeginTimeCommand.java
similarity index 57%
copy from
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
copy to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateMinimumActiveTxBeginTimeCommand.java
index b3760e9d15..0cd7bde81e 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateMinimumActiveTxBeginTimeCommand.java
@@ -15,17 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.catalog.compaction.message;
+package org.apache.ignite.internal.partition.replicator.network.command;
+
+import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND;
-import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
+import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
/**
- * Response message containing the low watermark required for the local node.
- * This watermark is used to safely truncate catalog history.
+ * Command to store the minimum starting time among all active RW transactions
+ * into transient state of each replication group.
*/
-@Transferable(CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_RESPONSE)
-public interface CatalogMinimumRequiredTimeResponse extends NetworkMessage {
- /** Returns node's minimum required time. */
+@Transferable(UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND)
+public interface UpdateMinimumActiveTxBeginTimeCommand extends
SafeTimePropagatingCommand {
+ /** Returns the minimum starting time among all active RW transactions. */
long timestamp();
}
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/UpdateMinimumActiveTxBeginTimeReplicaRequest.java
similarity index 60%
rename from
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
rename to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/UpdateMinimumActiveTxBeginTimeReplicaRequest.java
index b3760e9d15..1738db8791 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/UpdateMinimumActiveTxBeginTimeReplicaRequest.java
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.catalog.compaction.message;
+package org.apache.ignite.internal.partition.replicator.network.replication;
-import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
/**
- * Response message containing the low watermark required for the local node.
- * This watermark is used to safely truncate catalog history.
+ * Request to update the minimum starting time among all active RW
transactions.
*/
-@Transferable(CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_RESPONSE)
-public interface CatalogMinimumRequiredTimeResponse extends NetworkMessage {
- /** Returns node's minimum required time. */
+@Transferable(PartitionReplicationMessageGroup.UPDATE_MINIMUM_ACTIVE_TX_TIME_REPLICA_REQUEST)
+public interface UpdateMinimumActiveTxBeginTimeReplicaRequest extends
ReplicaRequest {
+ /** Returns the minimum starting time among all active RW transactions. */
long timestamp();
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 805c8fd585..e7bebf3218 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -762,25 +762,12 @@ public class IgniteImpl implements Ignite {
partitionIdleSafeTimePropagationPeriodMsSupplier
);
- CatalogCompactionRunner catalogCompactionRunner = new
CatalogCompactionRunner(
- name,
- catalogManager,
- clusterSvc.messagingService(),
- logicalTopologyService,
- placementDriverMgr.placementDriver(),
- clockService,
- threadPoolsManager.commonScheduler()
- );
-
-
metaStorageMgr.addElectionListener(catalogCompactionRunner::updateCoordinator);
-
systemViewManager = new SystemViewManagerImpl(name, catalogManager);
nodeAttributesCollector.register(systemViewManager);
logicalTopology.addEventListener(systemViewManager);
systemViewManager.register(catalogManager);
this.catalogManager = catalogManager;
- this.catalogCompactionRunner = catalogCompactionRunner;
lowWatermark = new LowWatermarkImpl(
name,
@@ -791,9 +778,6 @@ public class IgniteImpl implements Ignite {
clusterSvc.messagingService()
);
- lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
- params ->
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
params).newLowWatermark()));
-
this.indexMetaStorage = new IndexMetaStorage(catalogManager,
lowWatermark, metaStorageMgr);
raftMgr.appendEntriesRequestInterceptor(new
CheckCatalogVersionOnAppendEntries(catalogManager));
@@ -834,6 +818,25 @@ public class IgniteImpl implements Ignite {
clock
);
+ CatalogCompactionRunner catalogCompactionRunner = new
CatalogCompactionRunner(
+ name,
+ catalogManager,
+ clusterSvc.messagingService(),
+ logicalTopologyService,
+ placementDriverMgr.placementDriver(),
+ replicaSvc,
+ clockService,
+ schemaSyncService,
+ threadPoolsManager.commonScheduler(),
+ indexNodeFinishedRwTransactionsChecker
+ );
+
+
metaStorageMgr.addElectionListener(catalogCompactionRunner::updateCoordinator);
+ this.catalogCompactionRunner = catalogCompactionRunner;
+
+ lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
+ params ->
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
params).newLowWatermark()));
+
resourcesRegistry = new RemotelyTriggeredResourceRegistry();
var transactionInflights = new
TransactionInflights(placementDriverMgr.placementDriver(), clockService);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 195fe3e936..11fd10dc3e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -50,6 +50,7 @@ import
org.apache.ignite.internal.partition.replicator.network.command.BuildInde
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ReadCommand;
@@ -95,6 +96,9 @@ public class PartitionListener implements RaftGroupListener,
BeforeApplyHandler
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(PartitionListener.class);
+ /** Undefined value for {@link #minActiveTxBeginTime}. */
+ private static final long UNDEFINED_MIN_TX_TIME = 0L;
+
/** Transaction manager. */
private final TxManager txManager;
@@ -127,6 +131,12 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
private final IndexMetaStorage indexMetaStorage;
+ /**
+ * Timestamp with minimum starting time among all active RW transactions
in the cluster.
+ * This timestamp is used to prevent the catalog from being dropped, which
may be used when applying raft commands.
+ */
+ private volatile long minActiveTxBeginTime = UNDEFINED_MIN_TX_TIME;
+
/** Constructor. */
public PartitionListener(
TxManager txManager,
@@ -221,6 +231,8 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command,
commandIndex, commandTerm);
} else if (command instanceof VacuumTxStatesCommand) {
handleVacuumTxStatesCommand((VacuumTxStatesCommand)
command, commandIndex, commandTerm);
+ } else if (command instanceof
UpdateMinimumActiveTxBeginTimeCommand) {
+
handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand)
command, commandIndex, commandTerm);
} else {
assert false : "Command was not found [cmd=" + command +
']';
}
@@ -574,6 +586,21 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
return storage.getStorage();
}
+ /**
+ * Returns minimum starting time among all active RW transactions in the
cluster,
+ * or {@code null} if the value has not yet been set.
+ */
+ @TestOnly
+ public @Nullable Long minimumActiveTxBeginTime() {
+ long minActiveTxBeginTime0 = minActiveTxBeginTime;
+
+ if (minActiveTxBeginTime0 == UNDEFINED_MIN_TX_TIME) {
+ return null;
+ }
+
+ return minActiveTxBeginTime0;
+ }
+
/**
* Handler for the {@link BuildIndexCommand}.
*
@@ -666,6 +693,19 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
txStateStorage.removeAll(cmd.txIds(), commandIndex, commandTerm);
}
+ private void
handleUpdateMinimalActiveTxTimeCommand(UpdateMinimumActiveTxBeginTimeCommand
cmd, long commandIndex, long commandTerm) {
+ // Skips the write command because the storage has already executed it.
+ if (commandIndex <= storage.lastAppliedIndex()) {
+ return;
+ }
+
+ long minActiveTxBeginTime0 = minActiveTxBeginTime;
+
+ assert minActiveTxBeginTime0 <= cmd.timestamp() : "maxTime=" +
minActiveTxBeginTime0 + ", cmdTime=" + cmd.timestamp();
+
+ minActiveTxBeginTime = cmd.timestamp();
+ }
+
private static void onTxStateStorageCasFail(UUID txId, TxMeta
txMetaBeforeCas, TxMeta txMetaToSet) {
String errorMsg = format("Failed to update tx state in the storage,
transaction txId = {} because of inconsistent state,"
+ " expected state = {}, state to set = {}",
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ee0b0b0e01..ccc49bf2f6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -127,6 +127,7 @@ import
org.apache.ignite.internal.partition.replicator.network.replication.ReadW
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSwapRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
import
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
+import
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
@@ -813,6 +814,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
return
processTxStateCommitPartitionRequest((TxStateCommitPartitionRequest) request);
} else if (request instanceof VacuumTxStateReplicaRequest) {
return
processVacuumTxStateReplicaRequest((VacuumTxStateReplicaRequest) request);
+ } else if (request instanceof
UpdateMinimumActiveTxBeginTimeReplicaRequest) {
+ return
processMinimumActiveTxTimeReplicaRequest((UpdateMinimumActiveTxBeginTimeReplicaRequest)
request);
} else {
throw new UnsupportedReplicaRequestException(request.getClass());
}
@@ -4136,6 +4139,24 @@ public class PartitionReplicaListener implements
ReplicaListener {
return raftClient.run(cmd);
}
+ private CompletableFuture<?>
processMinimumActiveTxTimeReplicaRequest(UpdateMinimumActiveTxBeginTimeReplicaRequest
request) {
+ Command cmd =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+ .timestamp(request.timestamp())
+ .safeTime(clockService.now())
+ .build();
+
+ CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+ // The timestamp must increase monotonically, otherwise it will have
to be
+ // stored on disk so that reordering does not occur after the node is
restarted.
+ applyCmdWithRetryOnSafeTimeReorderException(
+ cmd,
+ resultFuture
+ );
+
+ return resultFuture;
+ }
+
/**
* Operation unique identifier.
*/
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveLocalTxMinimumBeginTimeProvider.java
similarity index 61%
rename from
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
rename to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveLocalTxMinimumBeginTimeProvider.java
index d1156a3736..90f3f537dd 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveLocalTxMinimumBeginTimeProvider.java
@@ -15,16 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.catalog.compaction.message;
+package org.apache.ignite.internal.tx;
-import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
/**
- * Request to obtain the minimum timestamp to which, from the local
- * node's perspective, the catalog history can be safely truncated.
+ * Provides the minimum begin time among all active RW transactions started
locally.
*/
-@Transferable(CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_REQUEST)
-public interface CatalogMinimumRequiredTimeRequest extends NetworkMessage {
-
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ActiveLocalTxMinimumBeginTimeProvider {
+ /**
+ * Returns the minimum begin time among all active RW transactions started
locally,
+ * or the current time if there are no active RW transactions.
+ */
+ HybridTimestamp minimumBeginTime();
}