This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 23d713afad7 IGNITE-24365 Implement
UpdateMinimumActiveTxBeginTimeReplicaRequest processing for zone replica (#5197)
23d713afad7 is described below
commit 23d713afad78011eb159005d451a2100455c372a
Author: Slava Koptilin <[email protected]>
AuthorDate: Fri Feb 14 16:44:27 2025 +0200
IGNITE-24365 Implement UpdateMinimumActiveTxBeginTimeReplicaRequest
processing for zone replica (#5197)
---
.../compaction/ItCatalogCompactionTest.java | 2 +-
.../compaction/CatalogCompactionRunner.java | 46 ++++---
.../compaction/CatalogManagerCompactionFacade.java | 39 +++++-
.../CatalogManagerCompactionFacadeTest.java | 74 ++++++++++-
modules/partition-replicator/build.gradle | 1 +
.../replicator/ItReplicaLifecycleTest.java | 140 ++++++++++++++++++-
.../replicator/ItZoneDataReplicationTest.java | 7 +-
.../partition/replicator/fixtures/Node.java | 45 +++++--
.../replicator/fixtures/TestPlacementDriver.java | 21 ++-
.../PartitionReplicaLifecycleManager.java | 2 +-
.../replicator/ZonePartitionReplicaListener.java | 148 ++++++++++++++++-----
.../MinimumActiveTxTimeReplicaRequestHandler.java | 71 ++++++++++
.../TxFinishReplicaRequestHandler.java | 5 +-
.../replicator/handlers/package-info.java | 23 ++++
.../replicator/raft/ZonePartitionRaftListener.java | 45 ++++---
.../{ => handlers}/FinishTxCommandHandler.java | 4 +-
.../replicator/raft/handlers/package-info.java | 24 ++++
.../raft/ZonePartitionRaftListenerTest.java | 2 +-
.../table/distributed/raft/PartitionListener.java | 51 +++----
.../MinimumActiveTxTimeCommandHandler.java | 85 ++++++++++++
.../distributed/raft/handlers/package-info.java | 23 ++++
.../replicator/PartitionReplicaListener.java | 32 ++---
22 files changed, 750 insertions(+), 140 deletions(-)
diff --git
a/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
b/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
index 0889134b0e0..d9244e49f30 100644
---
a/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
+++
b/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
@@ -71,7 +71,7 @@ class ItCatalogCompactionTest extends
ClusterPerClassIntegrationTest {
*/
private static final long CHECK_POINT_INTERVAL_MS = LW_UPDATE_TIME_MS / 2;
- /** Show be greater than 2 x {@link #LW_UPDATE_TIME_MS}. */
+ /** Should be greater than 2 x {@link #LW_UPDATE_TIME_MS}. */
private static final long COMPACTION_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(10);
/** Transactions that are started in the test. */
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
index da0d877bdd4..cef41906c6e 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
+++
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
@@ -18,6 +18,10 @@
package org.apache.ignite.internal.catalog.compaction;
import static java.util.function.Predicate.not;
+import static
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
+import static
org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
@@ -68,10 +72,11 @@ import
org.apache.ignite.internal.partition.replicator.network.replication.Updat
import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
-import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.schema.SchemaSyncService;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
import org.apache.ignite.internal.tx.ActiveLocalTxMinimumRequiredTimeProvider;
@@ -152,6 +157,10 @@ public class CatalogCompactionRunner implements
IgniteComponent {
private volatile UUID localNodeId;
+ /* Feature flag for zone based collocation track */
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove it.
+ private final boolean enabledColocationFeature =
getBoolean(COLOCATION_FEATURE_FLAG, false);
+
/**
* Constructs catalog compaction runner.
*/
@@ -403,10 +412,11 @@ public class CatalogCompactionRunner implements
IgniteComponent {
return schemaSyncService.waitForMetadataCompleteness(nowTs)
.thenComposeAsync(ignore -> {
- Int2IntMap tablesWithPartitions =
-
catalogManagerFacade.collectTablesWithPartitionsBetween(txBeginTime,
nowTs.longValue());
+ Int2IntMap idsWithPartitions = enabledColocationFeature
+ ?
catalogManagerFacade.collectZonesWithPartitionsBetween(txBeginTime,
nowTs.longValue())
+ :
catalogManagerFacade.collectTablesWithPartitionsBetween(txBeginTime,
nowTs.longValue());
- ObjectIterator<Entry> itr =
tablesWithPartitions.int2IntEntrySet().iterator();
+ ObjectIterator<Entry> itr =
idsWithPartitions.int2IntEntrySet().iterator();
return invokeOnLocalReplicas(txBeginTime, localNodeId,
itr);
}, executor);
@@ -586,22 +596,23 @@ public class CatalogCompactionRunner implements
IgniteComponent {
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
- private CompletableFuture<Void> invokeOnLocalReplicas(long txBeginTime,
UUID localNodeId, ObjectIterator<Entry> tabTtr) {
- if (!tabTtr.hasNext()) {
+ private CompletableFuture<Void> invokeOnLocalReplicas(long txBeginTime,
UUID localNodeId, ObjectIterator<Entry> entryIterator) {
+ if (!entryIterator.hasNext()) {
return CompletableFutures.nullCompletedFuture();
}
- Entry tableWithPartitions = tabTtr.next();
- int tableId = tableWithPartitions.getIntKey();
- int partitions = tableWithPartitions.getIntValue();
+ Entry idWithPartitions = entryIterator.next();
+ int id = idWithPartitions.getIntKey();
+ int partitions = idWithPartitions.getIntValue();
List<CompletableFuture<?>> partFutures = new ArrayList<>(partitions);
HybridTimestamp nowTs = clockService.now();
for (int p = 0; p < partitions; p++) {
- TablePartitionId tablePartitionId = new TablePartitionId(tableId,
p);
+ ReplicationGroupId groupReplicationId = enabledColocationFeature
+ ? new ZonePartitionId(id, p) : new TablePartitionId(id, p);
CompletableFuture<?> fut = placementDriver
- .getPrimaryReplica(tablePartitionId, nowTs)
+ .getPrimaryReplica(groupReplicationId, nowTs)
.thenCompose(meta -> {
// If primary is not elected yet - we'll update
replication groups on next iteration.
if (meta == null || meta.getLeaseholderId() == null) {
@@ -614,14 +625,13 @@ public class CatalogCompactionRunner implements
IgniteComponent {
return CompletableFutures.nullCompletedFuture();
}
- TablePartitionIdMessage partIdMessage =
ReplicaMessageUtils.toTablePartitionIdMessage(
- REPLICA_MESSAGES_FACTORY,
- tablePartitionId
- );
+ ReplicationGroupIdMessage groupIdMessage =
enabledColocationFeature
+ ?
toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, (ZonePartitionId)
groupReplicationId)
+ :
toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, (TablePartitionId)
groupReplicationId);
UpdateMinimumActiveTxBeginTimeReplicaRequest msg =
REPLICATION_MESSAGES_FACTORY
.updateMinimumActiveTxBeginTimeReplicaRequest()
- .groupId(partIdMessage)
+ .groupId(groupIdMessage)
.timestamp(txBeginTime)
.build();
@@ -632,7 +642,7 @@ public class CatalogCompactionRunner implements
IgniteComponent {
}
return CompletableFutures.allOf(partFutures)
- .thenComposeAsync(ignore -> invokeOnLocalReplicas(txBeginTime,
localNodeId, tabTtr), executor);
+ .thenComposeAsync(ignore -> invokeOnLocalReplicas(txBeginTime,
localNodeId, entryIterator), executor);
}
private class CatalogCompactionMessageHandler implements
NetworkMessageHandler {
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
index 79d9ad1a61b..6e0959a52ab 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
+++
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
@@ -38,6 +38,7 @@ class CatalogManagerCompactionFacade {
this.catalogManager = catalogManager;
}
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this
method.
/**
* Scans catalog versions in a given time interval (including interval
boundaries).
* Extracts all tables contained in these catalog versions and creates a
mapping
@@ -48,25 +49,53 @@ class CatalogManagerCompactionFacade {
* @return Mapping tableId to number of partitions in this table.
*/
Int2IntMap collectTablesWithPartitionsBetween(long minTsInclusive, long
maxTsInclusive) {
- Int2IntMap tablesWithPartitions = new Int2IntOpenHashMap();
+ Int2IntMap tableIdsWithPartitions = new Int2IntOpenHashMap();
int curVer = catalogManager.activeCatalogVersion(minTsInclusive);
int lastVer = catalogManager.activeCatalogVersion(maxTsInclusive);
do {
Catalog catalog = catalogManager.catalog(curVer);
- assert catalog != null : "ver=" + curVer + ", last=" + lastVer;
+ assert catalog != null : "Failed to find a catalog for the given
version [version=" + curVer + ", lastVersion=" + lastVer + ']';
for (CatalogTableDescriptor table : catalog.tables()) {
CatalogZoneDescriptor zone = catalog.zone(table.zoneId());
- assert zone != null : table.zoneId();
+ assert zone != null :
+ "Failed to find a zone for the given catalog version
[version=" + curVer + ", tableId=" + table.id() + ']';
- tablesWithPartitions.put(table.id(), zone.partitions());
+ tableIdsWithPartitions.put(table.id(), zone.partitions());
}
} while (++curVer <= lastVer);
- return tablesWithPartitions;
+ return tableIdsWithPartitions;
+ }
+
+ /**
+ * Scans catalog versions in a given time interval (including interval
boundaries).
+ * Extracts all zones contained in these catalog versions and creates a
mapping
+ * zoneId -> number of partitions in this table.
+ *
+ * @param minTsInclusive Lower timestamp (inclusive).
+ * @param maxTsInclusive Upper timestamp (inclusive).
+ * @return Mapping zoneId to number of partitions in this zone.
+ */
+ Int2IntMap collectZonesWithPartitionsBetween(long minTsInclusive, long
maxTsInclusive) {
+ Int2IntMap zoneIdsWithPartitions = new Int2IntOpenHashMap();
+ int curVer = catalogManager.activeCatalogVersion(minTsInclusive);
+ int lastVer = catalogManager.activeCatalogVersion(maxTsInclusive);
+
+ do {
+ Catalog catalog = catalogManager.catalog(curVer);
+
+ assert catalog != null : "Failed to find a catalog for the given
version [version=" + curVer + ", lastVersion=" + lastVer + ']';
+
+ for (CatalogZoneDescriptor zone : catalog.zones()) {
+ zoneIdsWithPartitions.put(zone.id(), zone.partitions());
+ }
+ } while (++curVer <= lastVer);
+
+ return zoneIdsWithPartitions;
}
/**
diff --git
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
index 865122afa87..3bc5ac616d3 100644
---
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
+++
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
@@ -31,8 +31,13 @@ import java.util.List;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
import org.apache.ignite.internal.catalog.commands.CreateTableCommandBuilder;
+import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
+import org.apache.ignite.internal.catalog.commands.CreateZoneCommandBuilder;
import org.apache.ignite.internal.catalog.commands.DropTableCommand;
import org.apache.ignite.internal.catalog.commands.DropTableCommandBuilder;
+import org.apache.ignite.internal.catalog.commands.DropZoneCommand;
+import org.apache.ignite.internal.catalog.commands.DropZoneCommandBuilder;
+import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -48,6 +53,7 @@ class CatalogManagerCompactionFacadeTest extends
AbstractCatalogCompactionTest {
catalogManagerFacade = new
CatalogManagerCompactionFacade(catalogManager);
}
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this
test.
@Test
void testCollectTablesWithPartitionsBetween() {
CreateTableCommandBuilder tableCmdBuilder =
CreateTableCommand.builder()
@@ -74,21 +80,24 @@ class CatalogManagerCompactionFacadeTest extends
AbstractCatalogCompactionTest {
assertThat(catalogManager.execute(dropTableCommandBuilder.tableName("test3").build()),
willCompleteSuccessfully());
{
- Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(from1,
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ from1,
clockService.nowLong());
assertThat(tablesWithParts.keySet(), hasSize(3));
}
{
- Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(from2,
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ from2,
clockService.nowLong());
assertThat(tablesWithParts.keySet(), hasSize(2));
}
{
- Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(from3,
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ from3,
clockService.nowLong());
assertThat(tablesWithParts.keySet(), hasSize(1));
@@ -97,13 +106,68 @@ class CatalogManagerCompactionFacadeTest extends
AbstractCatalogCompactionTest {
{
Int2IntMap tablesWithParts =
catalogManagerFacade.collectTablesWithPartitionsBetween(
clockService.nowLong(),
- clockService.nowLong()
- );
+ clockService.nowLong());
assertThat(tablesWithParts.keySet(), hasSize(0));
}
}
+ @Test
+ void testCollectZonesWithPartitionsBetween() {
+ CreateZoneCommandBuilder zoneCommandBuilder =
CreateZoneCommand.builder()
+
.storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile("ai-persist").build()))
+ .partitions(1);
+
+ DropZoneCommandBuilder dropZoneCommandBuilder =
DropZoneCommand.builder();
+
+ long from1 = clockService.nowLong();
+
+
assertThat(catalogManager.execute(zoneCommandBuilder.zoneName("test1").build()),
willCompleteSuccessfully());
+
assertThat(catalogManager.execute(dropZoneCommandBuilder.zoneName("test1").build()),
willCompleteSuccessfully());
+
+ long from2 = clockService.nowLong();
+
+
assertThat(catalogManager.execute(zoneCommandBuilder.zoneName("test2").build()),
willCompleteSuccessfully());
+
assertThat(catalogManager.execute(dropZoneCommandBuilder.zoneName("test2").build()),
willCompleteSuccessfully());
+
+ long from3 = clockService.nowLong();
+
assertThat(catalogManager.execute(zoneCommandBuilder.zoneName("test3").build()),
willCompleteSuccessfully());
+
assertThat(catalogManager.execute(dropZoneCommandBuilder.zoneName("test3").build()),
willCompleteSuccessfully());
+
+ // Take into account that there is the default zone.
+ {
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectZonesWithPartitionsBetween(
+ from1,
+ clockService.nowLong());
+
+ assertThat(tablesWithParts.keySet(), hasSize(4));
+ }
+
+ {
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectZonesWithPartitionsBetween(
+ from2,
+ clockService.nowLong());
+
+ assertThat(tablesWithParts.keySet(), hasSize(3));
+ }
+
+ {
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectZonesWithPartitionsBetween(
+ from3,
+ clockService.nowLong());
+
+ assertThat(tablesWithParts.keySet(), hasSize(2));
+ }
+
+ {
+ Int2IntMap tablesWithParts =
catalogManagerFacade.collectZonesWithPartitionsBetween(
+ clockService.nowLong(),
+ clockService.nowLong());
+
+ assertThat(tablesWithParts.keySet(), hasSize(1));
+ }
+ }
+
@Test
void testCatalogPriorToVersionAtTsNullable() {
Catalog earliestCatalog =
catalogManager.catalog(catalogManager.earliestCatalogVersion());
diff --git a/modules/partition-replicator/build.gradle
b/modules/partition-replicator/build.gradle
index d8c9d94f5e7..a0faa4a7b96 100644
--- a/modules/partition-replicator/build.gradle
+++ b/modules/partition-replicator/build.gradle
@@ -85,6 +85,7 @@ dependencies {
integrationTestImplementation project(':ignite-cluster-management')
integrationTestImplementation project(':ignite-schema')
integrationTestImplementation project(':ignite-catalog')
+ integrationTestImplementation project(':ignite-catalog-compaction') //
TODO https://issues.apache.org/jira/browse/IGNITE-22522
integrationTestImplementation project(':ignite-configuration')
integrationTestImplementation project(':ignite-configuration-root')
integrationTestImplementation project(':ignite-configuration-system')
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index bb5d88aa035..ff1513ae7f4 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.partition.replicator;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
@@ -26,16 +27,20 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesTest
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToPublisher;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.apache.ignite.sql.ColumnType.INT64;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -50,12 +55,12 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -80,13 +85,18 @@ import
org.apache.ignite.internal.partition.replicator.fixtures.Node.InvokeInter
import
org.apache.ignite.internal.partition.replicator.fixtures.TestPlacementDriver;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
+import
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils;
+import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
+import
org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
+import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.TableViewInternal;
@@ -157,6 +167,9 @@ public class ItReplicaLifecycleTest extends
IgniteAbstractTest {
@InjectConfiguration("mock.profiles = {" + DEFAULT_STORAGE_PROFILE +
".engine = aipersist, test.engine=test}")
private static StorageConfiguration storageConfiguration;
+ @InjectConfiguration
+ private GcConfiguration gcConfiguration;
+
@InjectExecutorService
private static ScheduledExecutorService scheduledExecutorService;
@@ -255,7 +268,8 @@ public class ItReplicaLifecycleTest extends
IgniteAbstractTest {
replicationConfiguration,
txConfiguration,
scheduledExecutorService,
- invokeInterceptor
+ invokeInterceptor,
+ gcConfiguration
);
nodes.put(idx, node);
@@ -842,6 +856,97 @@ public class ItReplicaLifecycleTest extends
IgniteAbstractTest {
assertDoesNotThrow(tx::commit);
}
+ @Test
+ public void testCatalogCompaction(TestInfo testInfo) throws Exception {
+ // How often we update the low water mark.
+ long lowWatermarkUpdateInterval = 500;
+ updateLowWatermarkConfiguration(lowWatermarkUpdateInterval * 2,
lowWatermarkUpdateInterval);
+
+ // Prepare a single node cluster.
+ startNodes(testInfo, 1);
+ Node node = getNode(0);
+
+ List<Set<Assignment>> assignments =
PartitionDistributionUtils.calculateAssignments(
+ nodes.values().stream().map(n -> n.name).collect(toList()), 1,
1);
+
+ List<TokenizedAssignments> tokenizedAssignments = assignments.stream()
+ .map(a -> new TokenizedAssignmentsImpl(a, Integer.MIN_VALUE))
+ .collect(toList());
+
+
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+ placementDriver.setAssignments(tokenizedAssignments);
+
+ forceCheckpoint(node);
+
+ String zoneName = "test-zone";
+ createZone(node, zoneName, 1, 1);
+ int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager,
zoneName, node.hybridClock.nowLong());
+ prepareTableIdToZoneIdConverter(node, zoneId);
+
+ int catalogVersion1 = getLatestCatalogVersion(node);
+
+ String tableName1 = "test_table_1";
+ createTable(node, zoneName, tableName1);
+
+ String tableName2 = "test_table_2";
+ createTable(node, zoneName, tableName2);
+
+ int tableId = TableTestUtils.getTableId(node.catalogManager,
tableName2, node.hybridClock.nowLong());
+ TableViewInternal tableViewInternal = node.tableManager.table(tableId);
+ KeyValueView<Long, Integer> tableView =
tableViewInternal.keyValueView(Long.class, Integer.class);
+
+ // Write 2 rows to the table.
+ Map<Long, Integer> valuesToPut = Map.of(0L, 0, 1L, 1);
+ assertDoesNotThrow(() -> tableView.putAll(null, valuesToPut));
+
+ forceCheckpoint(node);
+
+ int catalogVersion2 = getLatestCatalogVersion(node);
+ assertThat("The catalog version did not changed [initial=" +
catalogVersion1 + ", latest=" + catalogVersion2 + ']',
+ catalogVersion2, greaterThan(catalogVersion1));
+
+ expectEarliestCatalogVersion(node, catalogVersion2 - 1);
+ }
+
+ private static void expectEarliestCatalogVersion(Node node, int
expectedVersion) throws Exception {
+ boolean result = waitForCondition(() ->
getEarliestCatalogVersion(node) == expectedVersion, 10_000);
+
+ assertTrue(result,
+ "Failed to wait for the expected catalog version [expected=" +
expectedVersion
+ + ", earliest=" + getEarliestCatalogVersion(node)
+ + ", latest=" + getLatestCatalogVersion(node) + ']');
+ }
+
+ private static int getLatestCatalogVersion(Node node) {
+ Catalog catalog = getLatestCatalog(node);
+
+ return catalog.version();
+ }
+
+ private static int getEarliestCatalogVersion(Node node) {
+ CatalogManager catalogManager = node.catalogManager;
+
+ int ver = catalogManager.earliestCatalogVersion();
+
+ Catalog catalog = catalogManager.catalog(ver);
+
+ Objects.requireNonNull(catalog);
+
+ return catalog.version();
+ }
+
+ private static Catalog getLatestCatalog(Node node) {
+ CatalogManager catalogManager = node.catalogManager;
+
+ int ver =
catalogManager.activeCatalogVersion(node.hybridClock.nowLong());
+
+ Catalog catalog = catalogManager.catalog(ver);
+
+ Objects.requireNonNull(catalog);
+
+ return catalog;
+ }
+
private static RemotelyTriggeredResource getVersionedStorageCursor(Node
node, FullyQualifiedResourceId cursorId) {
return node.resourcesRegistry.resources().get(cursorId);
}
@@ -914,7 +1019,7 @@ public class ItReplicaLifecycleTest extends
IgniteAbstractTest {
return false;
}
- Replica replica = replicaFut.get(1, TimeUnit.SECONDS);
+ Replica replica = replicaFut.get(1, SECONDS);
return replica != null && (((ZonePartitionReplicaListener)
replica.listener()).tableReplicaListeners().size() == count);
} catch (ExecutionException | InterruptedException | TimeoutException
e) {
@@ -947,4 +1052,33 @@ public class ItReplicaLifecycleTest extends
IgniteAbstractTest {
verify(internalTable.txStateStorage(),
timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1))
.destroyTxStateStorage(partitionId);
}
+
+ /**
+ * Update low water mark configuration.
+ *
+ * @param dataAvailabilityTime Data availability time.
+ * @param updateInterval Update interval.
+ */
+ private void updateLowWatermarkConfiguration(long dataAvailabilityTime,
long updateInterval) {
+ CompletableFuture<?> updateFuture =
gcConfiguration.lowWatermark().change(change -> {
+ change.changeDataAvailabilityTime(dataAvailabilityTime);
+ change.changeUpdateInterval(updateInterval);
+ });
+
+ assertThat(updateFuture, willSucceedFast());
+ }
+
+ /**
+ * Start the new checkpoint immediately on the provided node.
+ *
+ * @param node Node to start the checkpoint on.
+ */
+ private void forceCheckpoint(Node node) {
+ PersistentPageMemoryStorageEngine storageEngine =
(PersistentPageMemoryStorageEngine) node
+ .dataStorageManager()
+ .engineByStorageProfile(DEFAULT_STORAGE_PROFILE);
+
+
assertThat(storageEngine.checkpointManager().forceCheckpoint("test-reason").futureFor(FINISHED),
+ willSucceedIn(10, SECONDS));
+ }
}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java
index de8c6c9d12b..ec43d53eea6 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java
@@ -74,6 +74,7 @@ import
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurat
import
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import org.apache.ignite.internal.table.InternalTable;
@@ -144,6 +145,9 @@ public class ItZoneDataReplicationTest extends
IgniteAbstractTest {
@InjectConfiguration("mock.profiles = {" + DEFAULT_STORAGE_PROFILE +
".engine = aipersist, test.engine=test}")
private static StorageConfiguration storageConfiguration;
+ @InjectConfiguration
+ private static GcConfiguration gcConfiguration;
+
@InjectExecutorService
private static ScheduledExecutorService scheduledExecutorService;
@@ -234,7 +238,8 @@ public class ItZoneDataReplicationTest extends
IgniteAbstractTest {
replicationConfiguration,
txConfiguration,
scheduledExecutorService,
- null
+ null,
+ gcConfiguration
);
}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index cdc63d197a4..2acb5599a5f 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -51,6 +51,7 @@ import java.util.function.LongSupplier;
import org.apache.ignite.internal.app.ThreadPoolsManager;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
@@ -78,6 +79,7 @@ import
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationSt
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
import
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import
org.apache.ignite.internal.distributionzones.rebalance.RebalanceMinimumRequiredTimeProviderImpl;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.ClockServiceImpl;
@@ -90,6 +92,8 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
+import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
+import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
@@ -129,7 +133,6 @@ import
org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
-import
org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration;
import
org.apache.ignite.internal.schema.configuration.GcExtensionConfigurationSchema;
import
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
import
org.apache.ignite.internal.schema.configuration.StorageUpdateExtensionConfiguration;
@@ -263,6 +266,8 @@ public class Node {
@Nullable
private volatile InvokeInterceptor invokeInterceptor;
+ private final CatalogCompactionRunner catalogCompactionRunner;
+
/** Interceptor for {@link MetaStorageManager#invoke} calls. */
@FunctionalInterface
public interface InvokeInterceptor {
@@ -285,7 +290,8 @@ public class Node {
ReplicationConfiguration replicationConfiguration,
TransactionConfiguration transactionConfiguration,
ScheduledExecutorService scheduledExecutorService,
- @Nullable InvokeInterceptor invokeInterceptor
+ @Nullable InvokeInterceptor invokeInterceptor,
+ GcConfiguration gcConfiguration
) {
this.invokeInterceptor = invokeInterceptor;
@@ -489,8 +495,6 @@ public class Node {
var registry = new
MetaStorageRevisionListenerRegistry(metaStorageManager);
- GcConfiguration gcConfig =
clusterConfigRegistry.getConfiguration(GcExtensionConfiguration.KEY).gc();
-
DataStorageModules dataStorageModules = new DataStorageModules(List.of(
new PersistentPageMemoryDataStorageModule(),
new NonVolatileTestDataStorageModule()
@@ -514,7 +518,7 @@ public class Node {
lowWatermark = new LowWatermarkImpl(
name,
- gcConfig.lowWatermark(),
+ gcConfiguration.lowWatermark(),
clockService,
vaultManager,
failureManager,
@@ -575,6 +579,28 @@ public class Node {
schemaSyncService = new
SchemaSyncServiceImpl(metaStorageManager.clusterTime(),
delayDurationMsSupplier);
+ MinimumRequiredTimeCollectorService minTimeCollectorService = new
MinimumRequiredTimeCollectorServiceImpl();
+
+ catalogCompactionRunner = new CatalogCompactionRunner(
+ name,
+ (CatalogManagerImpl) catalogManager,
+ clusterService.messagingService(),
+ logicalTopologyService,
+ placementDriver,
+ replicaSvc,
+ clockService,
+ schemaSyncService,
+ clusterService.topologyService(),
+ threadPoolsManager.commonScheduler(),
+ clockService::nowLong,
+ minTimeCollectorService,
+ new
RebalanceMinimumRequiredTimeProviderImpl(metaStorageManager, catalogManager));
+
+ ((MetaStorageManagerImpl)
metaStorageManager).addElectionListener(catalogCompactionRunner::updateCoordinator);
+
+ lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
+ params ->
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
params).newLowWatermark()));
+
ScheduledExecutorService rebalanceScheduler =
Executors.newScheduledThreadPool(
REBALANCE_SCHEDULER_POOL_SIZE,
NamedThreadFactory.create(name, "test-rebalance-scheduler",
LOG)
@@ -625,12 +651,10 @@ public class Node {
StorageUpdateConfiguration storageUpdateConfiguration =
clusterConfigRegistry
.getConfiguration(StorageUpdateExtensionConfiguration.KEY).storageUpdate();
- MinimumRequiredTimeCollectorService minTimeCollectorService = new
MinimumRequiredTimeCollectorServiceImpl();
-
tableManager = new TableManager(
name,
registry,
- gcConfig,
+ gcConfiguration,
transactionConfiguration,
storageUpdateConfiguration,
clusterService.messagingService(),
@@ -732,6 +756,7 @@ public class Node {
clusterCfgMgr,
clockWaiter,
catalogManager,
+ catalogCompactionRunner,
indexMetaStorage,
distributionZoneManager,
replicaManager,
@@ -823,4 +848,8 @@ public class Node {
public TxStatePartitionStorage txStatePartitionStorage(int zoneId, int
partitionId) {
return
partitionReplicaLifecycleManager.txStatePartitionStorage(zoneId, partitionId);
}
+
+ public DataStorageManager dataStorageManager() {
+ return dataStorageMgr;
+ }
}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
index 3ca208cd67d..a755622db34 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.partition.replicator.fixtures;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -46,6 +47,9 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
private volatile ReplicaMeta primary;
+ // Pre-calculated assignments for each partition.
+ private volatile List<TokenizedAssignments> tokenizedAssignments;
+
/**
* Set the primary replica.
*
@@ -79,6 +83,15 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
setPrimary(node, HybridTimestamp.MIN_VALUE);
}
+ /**
+ * Set pre-calculated assignments for each partition.
+ *
+ * @param tokenizedAssignments Pre-calculated assignments.
+ */
+ public void setAssignments(List<TokenizedAssignments>
tokenizedAssignments) {
+ this.tokenizedAssignments = tokenizedAssignments;
+ }
+
@Override
public CompletableFuture<ReplicaMeta>
awaitPrimaryReplica(ReplicationGroupId groupId, HybridTimestamp timestamp, long
timeout,
TimeUnit unit) {
@@ -105,7 +118,13 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait
) {
- return failedFuture(new
UnsupportedOperationException("getAssignments() is not supported in
FakePlacementDriver yet."));
+ List<TokenizedAssignments> assignments = tokenizedAssignments;
+
+ if (assignments == null) {
+ return failedFuture(new AssertionError("Pre-calculated assignments
are not defined in test PlacementDriver yet."));
+ } else {
+ return completedFuture(assignments);
+ }
}
private CompletableFuture<ReplicaMeta>
getPrimaryReplicaMeta(ReplicationGroupId replicationGroupId) {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 262e9531e0d..339b993a0ce 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -603,11 +603,11 @@ public class PartitionReplicaLifecycleManager extends
);
var raftGroupListener = new ZonePartitionRaftListener(
+ zonePartitionId,
txStatePartitionStorage,
txManager,
safeTimeTracker,
storageIndexTracker,
- zonePartitionId,
outgoingSnapshotsManager
);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index fdec32b6444..a0903359294 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -26,8 +26,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
+import
org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
+import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -35,6 +40,7 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.TableAware;
@@ -42,6 +48,7 @@ import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
/**
@@ -53,13 +60,20 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
// TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the
table replica listener if needed.
private final Map<TablePartitionId, ReplicaListener> replicas = new
ConcurrentHashMap<>();
+ /** Raft client. */
private final RaftCommandRunner raftClient;
+ private final ReplicationRaftCommandApplicator raftCommandApplicator;
+
+ // Replica request handlers.
private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
+ private final MinimumActiveTxTimeReplicaRequestHandler
minimumActiveTxTimeReplicaRequestHandler;
/**
* The constructor.
*
+ * @param replicationGroupId Zone replication group identifier.
+ * @param clockService Clock service.
* @param raftClient Raft client.
*/
public ZonePartitionReplicaListener(
@@ -74,6 +88,9 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
) {
this.raftClient = raftClient;
+ this.raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftClient, replicationGroupId);
+
+ // Request handlers initialization.
txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler(
txStatePartitionStorage,
clockService,
@@ -82,44 +99,113 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
schemaSyncService,
catalogService,
raftClient,
- replicationGroupId
- );
+ replicationGroupId);
+
+ minimumActiveTxTimeReplicaRequestHandler = new
MinimumActiveTxTimeReplicaRequestHandler(
+ clockService,
+ raftCommandApplicator);
}
@Override
public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request,
UUID senderId) {
- if (!(request instanceof TableAware)) {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-22620
implement ReplicaSafeTimeSyncRequest processing.
- if (request instanceof TxFinishReplicaRequest) {
- return
txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
- .thenApply(res -> new ReplicaResult(res, null));
- } else {
- if (request instanceof ReplicaSafeTimeSyncRequest) {
- LOG.debug("Non table request is not supported by the zone
partition yet " + request);
- } else {
- LOG.warn("Non table request is not supported by the zone
partition yet " + request);
- }
- }
+ return ensureReplicaIsPrimary(request)
+ .thenCompose(res -> processRequest(request, res.get1(),
senderId, res.get2()))
+ .thenApply(res -> {
+ if (res instanceof ReplicaResult) {
+ return (ReplicaResult) res;
+ } else {
+ return new ReplicaResult(res, null);
+ }
+ });
+ }
+
+ private CompletableFuture<?> processRequest(
+ ReplicaRequest request,
+ @Nullable Boolean isPrimary,
+ UUID senderId,
+ @Nullable Long leaseStartTime
+ ) {
+ if (request instanceof TableAware) {
+ // This type of request propagates to the table processor directly.
+ return processTableAwareRequest(request, senderId);
+ }
- return completedFuture(new ReplicaResult(null, null));
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement
ReplicaSafeTimeSyncRequest processing.
+ if (request instanceof TxFinishReplicaRequest) {
+ return
txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
+ .thenApply(res -> new ReplicaResult(res, null));
+ }
+
+ return processZoneReplicaRequest(request, isPrimary, senderId,
leaseStartTime);
+ }
+
+ /**
+ * Ensure that the primary replica was not changed.
+ *
+ * @param request Replica request.
+ * @return Future with {@link IgniteBiTuple} containing {@code boolean}
(whether the replica is primary) and the start time of current
+ * lease. The boolean is not {@code null} only for {@link
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The
+ * lease start time is not {@code null} in case of {@link
PrimaryReplicaRequest}.
+ */
+ private CompletableFuture<IgniteBiTuple<Boolean, Long>>
ensureReplicaIsPrimary(ReplicaRequest request) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-24380
+ // Move PartitionReplicaListener#ensureReplicaIsPrimary to
ZonePartitionReplicaListener.
+ return completedFuture(new IgniteBiTuple<>(null, null));
+ }
+
+ /**
+ * Processes {@link TableAware} request.
+ *
+ * @param request Request to be processed.
+ * @param senderId Node sender id.
+ * @return Future with the result of the request.
+ */
+ private CompletableFuture<ReplicaResult>
processTableAwareRequest(ReplicaRequest request, UUID senderId) {
+ assert request instanceof TableAware : "Request should be TableAware
[request=" + request.getClass().getSimpleName() + ']';
+
+ int partitionId;
+
+ ReplicationGroupId replicationGroupId =
request.groupId().asReplicationGroupId();
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine
this code when the zone based replication will be done.
+ if (replicationGroupId instanceof TablePartitionId) {
+ partitionId = ((TablePartitionId)
replicationGroupId).partitionId();
+ } else if (replicationGroupId instanceof ZonePartitionId) {
+ partitionId = ((ZonePartitionId) replicationGroupId).partitionId();
+ } else {
+ throw new IllegalArgumentException("Requests with replication
group type "
+ + request.groupId().getClass() + " is not supported");
+ }
+
+ return replicas.get(new TablePartitionId(((TableAware)
request).tableId(), partitionId))
+ .invoke(request, senderId);
+ }
+
+ /**
+ * Processes zone replica request.
+ *
+ * @param request Request to be processed.
+ * @param isPrimary {@code true} if the current node is the primary for
the partition, {@code false} otherwise.
+ * @param senderId Node sender id.
+ * @param leaseStartTime Lease start time.
+ * @return Future with the result of the processing.
+ */
+ private CompletableFuture<?> processZoneReplicaRequest(
+ ReplicaRequest request,
+ @Nullable Boolean isPrimary,
+ UUID senderId,
+ @Nullable Long leaseStartTime
+ ) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-24526
+ // Need to move the necessary part of
PartitionReplicaListener#processRequest request processing here
+ if (request instanceof UpdateMinimumActiveTxBeginTimeReplicaRequest) {
+ return
minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest)
request);
+ } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+ LOG.debug("Non table request is not supported by the zone
partition yet " + request);
} else {
- int partitionId;
-
- ReplicationGroupId replicationGroupId =
request.groupId().asReplicationGroupId();
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine
this code when the zone based replication will done.
- if (replicationGroupId instanceof TablePartitionId) {
- partitionId = ((TablePartitionId)
replicationGroupId).partitionId();
- } else if (replicationGroupId instanceof ZonePartitionId) {
- partitionId = ((ZonePartitionId)
replicationGroupId).partitionId();
- } else {
- throw new IllegalArgumentException("Requests with replication
group type "
- + request.groupId().getClass() + " is not supported");
- }
-
- return replicas.get(new TablePartitionId(((TableAware)
request).tableId(), partitionId))
- .invoke(request, senderId);
+ LOG.warn("Non table request is not supported by the zone partition
yet " + request);
}
+ return completedFuture(new ReplicaResult(null, null));
}
@Override
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/MinimumActiveTxTimeReplicaRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/MinimumActiveTxTimeReplicaRequestHandler.java
new file mode 100644
index 00000000000..8a3d35067f1
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/MinimumActiveTxTimeReplicaRequestHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.handlers;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.ClockService;
+import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
+import org.apache.ignite.internal.raft.Command;
+
+/**
+ * Handler for {@link UpdateMinimumActiveTxBeginTimeReplicaRequest}.
+ */
+public class MinimumActiveTxTimeReplicaRequestHandler {
+ /** Factory to create RAFT command messages. */
+ private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
+ new PartitionReplicationMessagesFactory();
+
+ /** Applicator that applies RAFT command that is created by this handler.
*/
+ private final ReplicationRaftCommandApplicator commandApplicator;
+
+ /** Clock service. */
+ private final ClockService clockService;
+
+ /**
+ * Creates a new instance of MinimumActiveTxTimeReplicaRequestHandler.
+ *
+ * @param clockService Clock service.
+ * @param commandApplicator Applicator that applies RAFT command that is
created by this handler.
+ */
+ public MinimumActiveTxTimeReplicaRequestHandler(
+ ClockService clockService,
+ ReplicationRaftCommandApplicator commandApplicator
+ ) {
+ this.clockService = clockService;
+ this.commandApplicator = commandApplicator;
+ }
+
+ /**
+ * Handles {@link UpdateMinimumActiveTxBeginTimeReplicaRequest}.
+ *
+ * @param request Request to handle.
+ * @return Future that will be completed when the request is handled.
+ */
+ public CompletableFuture<?>
handle(UpdateMinimumActiveTxBeginTimeReplicaRequest request) {
+ Command cmd =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+ .timestamp(request.timestamp())
+ .initiatorTime(clockService.now())
+ .build();
+
+ // The timestamp must increase monotonically, otherwise it will have
to be
+ // stored on disk so that reordering does not occur after the node is
restarted.
+ return commandApplicator.applyCmdWithExceptionHandling(cmd);
+ }
+}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
similarity index 98%
rename from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java
rename to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
index dcf5bf37c7e..264de7b9927 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.partition.replicator;
+package org.apache.ignite.internal.partition.replicator.handlers;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
@@ -39,6 +39,9 @@ import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
+import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker;
+import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
import
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/package-info.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/package-info.java
new file mode 100644
index 00000000000..2cc8b2c87f2
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains replica request handlers that is used by
+ * {@link
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener}
and table requests processor.
+ */
+
+package org.apache.ignite.internal.partition.replicator.handlers;
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index 780efd0a4e5..018b109b55b 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -29,6 +29,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.TableAwareCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
+import
org.apache.ignite.internal.partition.replicator.raft.handlers.FinishTxCommandHandler;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots;
@@ -60,6 +62,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
private final PendingComparableValuesTracker<Long, Void>
storageIndexTracker;
+ /** Mapping table partition identifier to table request processor. */
private final Map<TablePartitionId, RaftTableProcessor> tableProcessors =
new ConcurrentHashMap<>();
private final PartitionsSnapshots partitionsSnapshots;
@@ -75,31 +78,18 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
private final Object commitedConfigurationLock = new Object();
- private static class CommittedConfiguration {
- final RaftGroupConfiguration configuration;
-
- final long lastAppliedIndex;
-
- final long lastAppliedTerm;
-
- CommittedConfiguration(RaftGroupConfiguration configuration, long
lastAppliedIndex, long lastAppliedTerm) {
- this.configuration = configuration;
- this.lastAppliedIndex = lastAppliedIndex;
- this.lastAppliedTerm = lastAppliedTerm;
- }
- }
-
+ // Raft command handlers.
private final FinishTxCommandHandler finishTxCommandHandler;
private final OnSnapshotSaveHandler onSnapshotSaveHandler;
/** Constructor. */
public ZonePartitionRaftListener(
+ ZonePartitionId zonePartitionId,
TxStatePartitionStorage txStatePartitionStorage,
TxManager txManager,
SafeTimeValuesTracker safeTimeTracker,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
- ZonePartitionId zonePartitionId,
PartitionsSnapshots partitionsSnapshots
) {
this.safeTimeTracker = safeTimeTracker;
@@ -107,6 +97,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
this.partitionsSnapshots = partitionsSnapshots;
this.partitionKey = new ZonePartitionKey(zonePartitionId.zoneId(),
zonePartitionId.partitionId());
+ // RAFT command handlers initialization.
finishTxCommandHandler = new FinishTxCommandHandler(
txStatePartitionStorage,
// TODO: IGNITE-24343 - use ZonePartitionId here.
@@ -179,6 +170,16 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
TablePartitionId tablePartitionId = ((TableAwareCommand)
command).tablePartitionId().asTablePartitionId();
result = processTableAwareCommand(tablePartitionId, command,
commandIndex, commandTerm, safeTimestamp);
+ } else if (command instanceof
UpdateMinimumActiveTxBeginTimeCommand) {
+ result = new IgniteBiTuple<>(null, false);
+
+ tableProcessors.values().forEach(processor -> {
+ IgniteBiTuple<Serializable, Boolean> r =
processor.processCommand(command, commandIndex, commandTerm, safeTimestamp);
+ // Need to adjust the safe time if any of the table
processors successfully handled the command.
+ if (Boolean.TRUE.equals(r.get2())) {
+ result.set2(Boolean.TRUE);
+ }
+ });
} else {
LOG.info("Message type " + command.getClass() + " is not
supported by the zone partition RAFT listener yet");
@@ -277,4 +278,18 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
private PartitionSnapshots partitionSnapshots() {
return partitionsSnapshots.partitionSnapshots(partitionKey);
}
+
+ private static class CommittedConfiguration {
+ final RaftGroupConfiguration configuration;
+
+ final long lastAppliedIndex;
+
+ final long lastAppliedTerm;
+
+ CommittedConfiguration(RaftGroupConfiguration configuration, long
lastAppliedIndex, long lastAppliedTerm) {
+ this.configuration = configuration;
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ }
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FinishTxCommandHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/FinishTxCommandHandler.java
similarity index 95%
rename from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FinishTxCommandHandler.java
rename to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/FinishTxCommandHandler.java
index 3c48fd36bd7..5014b01264e 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FinishTxCommandHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/FinishTxCommandHandler.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.partition.replicator.raft;
+package org.apache.ignite.internal.partition.replicator.raft.handlers;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -30,6 +30,8 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
+import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker;
+import
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/package-info.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/package-info.java
new file mode 100644
index 00000000000..d0c81eaa4ff
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains RAFT command handlers that is used by
+ * {@link
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener}
and
+ * {@link
org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor}.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.handlers;
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index b91af3ed9a0..205eac57627 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -68,11 +68,11 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
@BeforeEach
void setUp() {
listener = new ZonePartitionRaftListener(
+ new ZonePartitionId(ZONE_ID, PARTITION_ID),
txStatePartitionStorage,
txManager,
new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE),
new PendingComparableValuesTracker<>(0L),
- new ZonePartitionId(ZONE_ID, PARTITION_ID),
outgoingSnapshotsManager
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index c7a99f60025..499fb2558e9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -51,10 +51,10 @@ import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAll
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
-import
org.apache.ignite.internal.partition.replicator.raft.FinishTxCommandHandler;
import
org.apache.ignite.internal.partition.replicator.raft.OnSnapshotSaveHandler;
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker;
+import
org.apache.ignite.internal.partition.replicator.raft.handlers.FinishTxCommandHandler;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
@@ -78,6 +78,7 @@ import
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexMeta;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
+import
org.apache.ignite.internal.table.distributed.raft.handlers.MinimumActiveTxTimeCommandHandler;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.UpdateCommandResult;
@@ -125,13 +126,14 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
// This variable is volatile, because it may be updated outside the Raft
thread under the colocation feature.
private volatile Set<String> currentGroupTopology;
- private final MinimumRequiredTimeCollectorService minTimeCollectorService;
+ private final OnSnapshotSaveHandler onSnapshotSaveHandler;
+ // Raft command handlers.
private final RaftTxFinishMarker txFinisher;
private final FinishTxCommandHandler finishTxCommandHandler;
- private final OnSnapshotSaveHandler onSnapshotSaveHandler;
+ private final MinimumActiveTxTimeCommandHandler
minimumActiveTxTimeCommandHandler;
/** Constructor. */
public PartitionListener(
@@ -157,15 +159,21 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
this.schemaRegistry = schemaRegistry;
this.indexMetaStorage = indexMetaStorage;
this.localNodeId = localNodeId;
- this.minTimeCollectorService = minTimeCollectorService;
+ onSnapshotSaveHandler = new
OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker);
+
+ // RAFT command handlers initialization.
+ TablePartitionId tablePartitionId = new
TablePartitionId(storage.tableId(), storage.partitionId());
txFinisher = new RaftTxFinishMarker(txManager);
finishTxCommandHandler = new FinishTxCommandHandler(
txStatePartitionStorage,
- new TablePartitionId(storage.tableId(), storage.partitionId()),
- txManager
- );
- onSnapshotSaveHandler = new
OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker);
+ tablePartitionId,
+ txManager);
+
+ minimumActiveTxTimeCommandHandler = new
MinimumActiveTxTimeCommandHandler(
+ storage,
+ tablePartitionId,
+ minTimeCollectorService);
}
@Override
@@ -257,10 +265,9 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
} else if (command instanceof VacuumTxStatesCommand) {
result = handleVacuumTxStatesCommand((VacuumTxStatesCommand)
command, commandIndex, commandTerm);
} else if (command instanceof UpdateMinimumActiveTxBeginTimeCommand) {
- result =
handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand)
command, commandIndex,
- commandTerm);
+ result =
minimumActiveTxTimeCommandHandler.handle((UpdateMinimumActiveTxBeginTimeCommand)
command, commandIndex);
} else {
- throw new AssertionError("Unknown command type: " +
command.toStringForLightLogging());
+ throw new AssertionError("Unknown command type [command=" +
command.toStringForLightLogging() + ']');
}
if (Boolean.TRUE.equals(result.get2())) {
@@ -645,28 +652,6 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
return new IgniteBiTuple<>(null, true);
}
- private IgniteBiTuple<Serializable, Boolean>
handleUpdateMinimalActiveTxTimeCommand(
- UpdateMinimumActiveTxBeginTimeCommand cmd,
- long commandIndex,
- long commandTerm
- ) {
- // Skips the write command because the storage has already executed it.
- if (commandIndex <= storage.lastAppliedIndex()) {
- return new IgniteBiTuple<>(null, false);
- }
-
- long timestamp = cmd.timestamp();
-
- storage.flush(false).whenComplete((r, t) -> {
- if (t == null) {
- TablePartitionId partitionId = new
TablePartitionId(storage.tableId(), storage.partitionId());
-
minTimeCollectorService.recordMinActiveTxTimestamp(partitionId, timestamp);
- }
- });
-
- return new IgniteBiTuple<>(null, true);
- }
-
private static <T extends Comparable<T>> void
updateTrackerIgnoringTrackerClosedException(
PendingComparableValuesTracker<T, Void> tracker,
T newValue
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
new file mode 100644
index 00000000000..14f0f5c3bcf
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.handlers;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+
+/**
+ * RAFT command handler that process {@link
UpdateMinimumActiveTxBeginTimeCommand} commands.
+ */
+public class MinimumActiveTxTimeCommandHandler {
+ /** Data storage to which the command will be applied. */
+ private final PartitionDataStorage storage;
+
+ /**
+ * Table partition identifier.
+ * {@link TablePartitionId} is used here instead of {@link
org.apache.ignite.internal.replicator.ZonePartitionId}
+ * intentionally because we are not going to re-work catalog compaction
internals {@link MinimumRequiredTimeCollectorService}.
+ **/
+ private final TablePartitionId tablePartitionId;
+
+ /** Service that collects minimum required timestamp for each partition. */
+ private final MinimumRequiredTimeCollectorService minTimeCollectorService;
+
+ /**
+ * Creates a new instance of the command handler.
+ *
+ * @param storage Partition data storage.
+ * @param tablePartitionId Table partition identifier.
+ * @param minTimeCollectorService Minimum required time collector service.
+ */
+ public MinimumActiveTxTimeCommandHandler(
+ PartitionDataStorage storage,
+ TablePartitionId tablePartitionId,
+ MinimumRequiredTimeCollectorService minTimeCollectorService
+ ) {
+ this.storage = storage;
+ this.tablePartitionId = tablePartitionId;
+ this.minTimeCollectorService = minTimeCollectorService;
+ }
+
+ /**
+ * Handles {@link UpdateMinimumActiveTxBeginTimeCommand} command.
+ *
+ * @param cmd Command to be processed.
+ * @param commandIndex Command index.
+ * @return Tuple with the result of the command processing and a flag
indicating whether the command was applied.
+ */
+ public IgniteBiTuple<Serializable, Boolean>
handle(UpdateMinimumActiveTxBeginTimeCommand cmd, long commandIndex) {
+ // Skips the write command because the storage has already executed it.
+ if (commandIndex <= storage.lastAppliedIndex()) {
+ return new IgniteBiTuple<>(null, false);
+ }
+
+ long timestamp = cmd.timestamp();
+
+ storage.flush(false)
+ .whenComplete((r, t) -> {
+ if (t == null) {
+
minTimeCollectorService.recordMinActiveTxTimestamp(tablePartitionId, timestamp);
+ }
+ });
+
+ return new IgniteBiTuple<>(null, true);
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
new file mode 100644
index 00000000000..73bb636fa4c
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains RAFT command handlers that is used by
+ * {@link org.apache.ignite.internal.table.distributed.raft.PartitionListener}
aka table raft processor.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.handlers;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index f6267c46619..b024c0f0561 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -109,7 +109,8 @@ import
org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker;
import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
-import
org.apache.ignite.internal.partition.replicator.TxFinishReplicaRequestHandler;
+import
org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
+import
org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
@@ -359,7 +360,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
private final ReplicationRaftCommandApplicator raftCommandApplicator;
private final ReplicaTxFinishMarker replicaTxFinishMarker;
+ // Replica request handlers.
private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
+ private final MinimumActiveTxTimeReplicaRequestHandler
minimumActiveTxTimeReplicaRequestHandler;
/**
* The constructor.
@@ -454,6 +457,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
replicationGroupId
);
+ minimumActiveTxTimeReplicaRequestHandler = new
MinimumActiveTxTimeReplicaRequestHandler(
+ clockService,
+ raftCommandApplicator);
+
prepareIndexBuilderTxRwOperationTracker();
}
@@ -519,6 +526,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
private CompletableFuture<?> processRequest(ReplicaRequest request,
@Nullable Boolean isPrimary, UUID senderId,
@Nullable Long leaseStartTime) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-24526
+ // Need to move the necessary part of request processing to
ZonePartitionReplicaListener
+
boolean hasSchemaVersion = request instanceof
SchemaVersionAwareReplicaRequest;
if (hasSchemaVersion) {
@@ -879,10 +889,13 @@ public class PartitionReplicaListener implements
ReplicaListener {
} else if (request instanceof VacuumTxStateReplicaRequest) {
return
processVacuumTxStateReplicaRequest((VacuumTxStateReplicaRequest) request);
} else if (request instanceof
UpdateMinimumActiveTxBeginTimeReplicaRequest) {
- return
processMinimumActiveTxTimeReplicaRequest((UpdateMinimumActiveTxBeginTimeReplicaRequest)
request);
- } else {
- throw new UnsupportedReplicaRequestException(request.getClass());
+ if (!enabledColocationFeature) {
+ return
minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest)
request);
+ }
}
+
+ // Unknown request.
+ throw new UnsupportedReplicaRequestException(request.getClass());
}
/**
@@ -3968,17 +3981,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
return raftCommandRunner.run(cmd);
}
- private CompletableFuture<?>
processMinimumActiveTxTimeReplicaRequest(UpdateMinimumActiveTxBeginTimeReplicaRequest
request) {
- Command cmd =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
- .timestamp(request.timestamp())
- .initiatorTime(clockService.now())
- .build();
-
- // The timestamp must increase monotonically, otherwise it will have
to be
- // stored on disk so that reordering does not occur after the node is
restarted.
- return applyCmdWithExceptionHandling(cmd);
- }
-
/**
* Operation unique identifier.
*/