This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 498365b0224 IGNITE-27224 Remove non-colocated code from
PartitionListener (#7128)
498365b0224 is described below
commit 498365b022437a6be5fff718c4ee44373f971723
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Dec 8 12:49:46 2025 +0200
IGNITE-27224 Remove non-colocated code from PartitionListener (#7128)
---
.../ItZonePartitionRaftListenerRecoveryTest.java | 3 -
.../raft/ZonePartitionRaftListenerTest.java | 3 -
.../exec/rel/TableScanNodeExecutionTest.java | 1 -
.../ReplicasSafeTimePropagationTest.java | 8 +-
.../ignite/internal/table/ItColocationTest.java | 1 -
.../internal/table/distributed/TableManager.java | 190 +--------------------
.../disaster/ManualGroupRestartRequest.java | 119 ++++++-------
.../table/distributed/raft/PartitionListener.java | 67 +-------
.../distributed/storage/InternalTableImpl.java | 21 +--
.../distributed/TableManagerRecoveryTest.java | 32 +---
.../raft/PartitionCommandListenerTest.java | 95 +----------
.../storage/InternalTableEstimatedSizeTest.java | 1 -
.../distributed/storage/InternalTableImplTest.java | 12 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 90 ++++------
.../table/impl/DummyInternalTableImpl.java | 6 +-
.../disaster/ItDisasterRecoveryManagerTest.java | 2 +
16 files changed, 107 insertions(+), 544 deletions(-)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index df3fc9078ea..7a31f5a15ae 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.components.NoOpLogSyncer;
-import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -401,7 +400,6 @@ class ItZonePartitionRaftListenerRecoveryTest extends
IgniteAbstractTest {
storageUpdateHandler,
txStateStorage.getOrCreatePartitionStorage(PARTITION_ID.partitionId()),
new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE),
- new PendingComparableValuesTracker<>(0L),
catalogService,
schemaRegistry,
indexMetaStorage,
@@ -410,7 +408,6 @@ class ItZonePartitionRaftListenerRecoveryTest extends
IgniteAbstractTest {
executor,
placementDriver,
clockService,
- new SystemPropertiesNodeProperties(),
new ZonePartitionId(PARTITION_ID.zoneId(),
PARTITION_ID.partitionId())
);
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index 41658a40548..ad4cc391390 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -48,7 +48,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -571,7 +570,6 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
mock(StorageUpdateHandler.class),
txStatePartitionStorage,
new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE),
- new PendingComparableValuesTracker<>(0L),
mock(CatalogService.class),
mock(SchemaRegistry.class),
mock(IndexMetaStorage.class),
@@ -580,7 +578,6 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
mock(Executor.class),
placementDriver,
clockService,
- new SystemPropertiesNodeProperties(),
new ZonePartitionId(ZONE_ID, PARTITION_ID)
);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index e2f06383bd9..96549b88238 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -337,7 +337,6 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- true,
new TableMetricSource(QualifiedName.fromSimple("test"))
);
this.dataAmount = dataAmount;
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
index 7e6acb5442d..dfea39f71d0 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
@@ -45,7 +45,6 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.catalog.CatalogService;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
-import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -76,7 +75,7 @@ import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
-import org.apache.ignite.internal.replicator.TestReplicationGroupId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
@@ -87,7 +86,6 @@ import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionC
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.conf.Configuration;
@@ -114,7 +112,7 @@ public class ReplicasSafeTimePropagationTest extends
IgniteAbstractTest {
private static final int BASE_PORT = 1234;
- private static final TestReplicationGroupId GROUP_ID = new
TestReplicationGroupId("group_1");
+ private static final ZonePartitionId GROUP_ID = new ZonePartitionId(1, 0);
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
@@ -395,7 +393,6 @@ public class ReplicasSafeTimePropagationTest extends
IgniteAbstractTest {
mock(StorageUpdateHandler.class),
mock(TxStatePartitionStorage.class),
safeTs,
- mock(PendingComparableValuesTracker.class),
mock(CatalogService.class),
mock(SchemaRegistry.class),
mock(IndexMetaStorage.class),
@@ -404,7 +401,6 @@ public class ReplicasSafeTimePropagationTest extends
IgniteAbstractTest {
mock(Executor.class),
placementDriver,
clockService,
- new SystemPropertiesNodeProperties(),
GROUP_ID
) {
@Override
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 086d608930a..336df84b531 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -367,7 +367,6 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- true,
new TableMetricSource(QualifiedName.fromSimple("TEST"))
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 22d40705cb8..c9c3730e7e6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -30,10 +30,8 @@ import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stableAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED;
@@ -66,7 +64,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -136,15 +133,11 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartiti
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
import
org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
-import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
-import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.ReplicaManager;
-import
org.apache.ignite.internal.replicator.ReplicaManager.WeakReplicaStopReason;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -385,7 +378,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final EventListener<CatalogEventParameters> onTableAlterListener =
this::onTableAlter;
private final EventListener<ChangeLowWatermarkEventParameters>
onLowWatermarkChangedListener = this::onLwmChanged;
- private final EventListener<PrimaryReplicaEventParameters>
onPrimaryReplicaExpiredListener = this::onTablePrimaryReplicaExpired;
private final MetricManager metricManager;
private final PartitionModificationCounterFactory
partitionModificationCounterFactory;
@@ -580,10 +572,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
partitionReplicatorNodeRecovery.start();
- if (!nodeProperties.colocationEnabled()) {
-
executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
onPrimaryReplicaExpiredListener);
- }
-
CompletableFuture<Revisions> recoveryFinishFuture =
metaStorageMgr.recoveryFinishedFuture();
assert recoveryFinishFuture.isDone();
@@ -594,15 +582,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
});
}
- private CompletableFuture<Void> waitForMetadataCompleteness(long ts) {
- return
executorInclinedSchemaSyncService.waitForMetadataCompleteness(hybridTimestamp(ts));
- }
-
private CompletableFuture<Boolean>
beforeZoneReplicaStarted(LocalPartitionReplicaEventParameters parameters) {
- if (!nodeProperties.colocationEnabled()) {
- return falseCompletedFuture();
- }
-
return inBusyLockAsync(busyLock, () -> readyToProcessReplicaStarts
.thenCompose(v -> beforeZoneReplicaStartedImpl(parameters))
.thenApply(unused -> false)
@@ -681,10 +661,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
private CompletableFuture<Boolean>
onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
- if (!nodeProperties.colocationEnabled()) {
- return falseCompletedFuture();
- }
-
ZonePartitionId zonePartitionId = parameters.zonePartitionId();
NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
@@ -709,10 +685,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
private CompletableFuture<Boolean>
onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
- if (!nodeProperties.colocationEnabled()) {
- return falseCompletedFuture();
- }
-
ZonePartitionId zonePartitionId = parameters.zonePartitionId();
NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
@@ -894,22 +866,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
var safeTimeTracker = new
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
- // TODO https://issues.apache.org/jira/browse/IGNITE-22522 After
switching to the colocation track, the storageIndexTracker
- // will no longer need to be transferred to the table listeners.
- var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L) {
- @Override
- public void update(Long newValue, @Nullable Void futureResult) {
- throw new UnsupportedOperationException("It's not expected
that in case of enabled colocation table storageIndexTracker"
- + " will be updated.");
- }
-
- @Override
- public CompletableFuture<Void> waitFor(Long valueToWait) {
- throw new UnsupportedOperationException("It's not expected
that in case of enabled colocation table storageIndexTracker"
- + " will be updated.");
- }
- };
-
PartitionStorages partitionStorages;
try {
partitionStorages = getPartitionStorages(table, partId);
@@ -932,7 +888,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
replicationConfiguration
);
- internalTbl.updatePartitionTrackers(partId, safeTimeTracker,
storageIndexTracker);
+ internalTbl.updatePartitionTrackers(partId, safeTimeTracker);
mvGc.addStorage(tablePartitionId,
partitionUpdateHandlers.gcUpdateHandler);
@@ -954,7 +910,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
partitionUpdateHandlers.storageUpdateHandler,
partitionStorages.getTxStateStorage(),
safeTimeTracker,
- storageIndexTracker,
catalogService,
table.schemaView(),
indexMetaStorage,
@@ -963,7 +918,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
partitionOperationsExecutor,
executorInclinedPlacementDriver,
clockService,
- nodeProperties,
zonePartitionId
);
@@ -988,25 +942,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
);
}
- private CompletableFuture<Boolean>
onTablePrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
- if (thisNodeHoldsLease(parameters.leaseholderId())) {
- TablePartitionId groupId = (TablePartitionId) parameters.groupId();
-
- // We do not wait future in order not to block meta storage
updates.
- replicaMgr.weakStopReplica(
- groupId,
- WeakReplicaStopReason.PRIMARY_EXPIRED,
- () -> stopAndDestroyTablePartition(groupId,
tablesVv.latestCausalityToken())
- );
- }
-
- return falseCompletedFuture();
- }
-
- private boolean thisNodeHoldsLease(@Nullable UUID leaseholderId) {
- return localNode().id().equals(leaseholderId);
- }
-
private void onTableDrop(DropTableEventParameters parameters) {
inBusyLock(busyLock, () -> {
unregisterMetricsSource(startedTables.get(parameters.tableId()));
@@ -1141,10 +1076,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
busyLock.block();
- if (!nodeProperties.colocationEnabled()) {
-
executorInclinedPlacementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
onPrimaryReplicaExpiredListener);
- }
-
lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
onLowWatermarkChangedListener);
catalogService.removeListener(CatalogEvent.TABLE_CREATE,
onTableCreateWithColocationListener);
@@ -1169,11 +1100,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
try {
closeAllManually(
- () -> {
- if (nodeProperties.colocationEnabled()) {
-
closeAllManually(tables.values().stream().map(table -> () ->
closeTable(table)));
- }
- },
+ () -> closeAllManually(tables.values().stream().map(table
-> () -> closeTable(table))),
mvGc,
fullStateTransferIndexChooser,
() -> shutdownAndAwaitTermination(scanRequestExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
@@ -1263,7 +1190,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
Objects.requireNonNull(streamerReceiverRunner),
() -> txCfg.value().readWriteTimeoutMillis(),
() -> txCfg.value().readOnlyTimeoutMillis(),
- nodeProperties.colocationEnabled(),
createAndRegisterMetricsSource(tableStorage.getTableDescriptor(), tableName)
);
@@ -1582,35 +1508,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return IgniteUtils.getInterruptibly(future);
}
- private CompletableFuture<Void> createPartitionAndStartClient(
- TablePartitionId replicaGrpId,
- TableViewInternal tbl,
- long assignmentsTimestamp
- ) {
- int partitionId = replicaGrpId.partitionId();
-
- PartitionSet singlePartitionIdSet = PartitionSet.of(partitionId);
-
- // TODO https://issues.apache.org/jira/browse/IGNITE-20957 Revisit
this code
- return inBusyLock(
- busyLock,
- () -> getOrCreatePartitionStorages(tbl, singlePartitionIdSet)
- .thenRun(() -> localPartsByTableId.compute(
- replicaGrpId.tableId(),
- (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionId)
- ))
- // If the table is already closed, it's not a problem
(probably the node is stopping).
- .exceptionally(ignoreTableClosedException())
- ).thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
- lowWatermark.getLowWatermarkSafe(lwm ->
- registerIndexesToTable(tbl, catalogService,
singlePartitionIdSet, tbl.schemaView(), lwm)
- );
-
- // TODO IGNITE-22522 Consider removing, not sure though.
- return waitForMetadataCompleteness(assignmentsTimestamp);
- }), ioExecutor);
- }
-
private static PartitionSet extendPartitionSet(@Nullable PartitionSet
oldPartitionSet, int partitionId) {
PartitionSet newPartitionSet =
Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
newPartitionSet.set(partitionId);
@@ -1761,38 +1658,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
.thenComposeAsync(v ->
destroyPartitionStorages(tablePartitionId, table, destroyingWholeTable),
ioExecutor);
}
- /**
- * Stops all resources associated with a given partition, like replicas
and partition trackers. Calls
- * {@link ReplicaManager#weakStopReplica} in order to change the replica
state.
- *
- * @param tablePartitionId Partition ID.
- * @param table Table which this partition belongs to.
- * @return Future that will be completed after all resources have been
closed.
- */
- private CompletableFuture<Void> stopPartitionForRestart(TablePartitionId
tablePartitionId, TableViewInternal table) {
- return replicaMgr.weakStopReplica(
- tablePartitionId,
- WeakReplicaStopReason.RESTART,
- () -> stopTablePartition(tablePartitionId, table)
- );
- }
-
- /**
- * Stops all resources associated with a given partition, like replicas
and partition trackers. Calls
- * {@link ReplicaManager#weakStopReplica} in order to change the replica
state.
- *
- * @param tablePartitionId Partition ID.
- * @param table Table which this partition belongs to.
- * @return Future that will be completed after all resources have been
closed.
- */
- private CompletableFuture<Void>
stopPartitionAndDestroyForRestart(TablePartitionId tablePartitionId,
TableViewInternal table) {
- return replicaMgr.weakStopReplica(
- tablePartitionId,
- WeakReplicaStopReason.RESTART,
- () -> stopAndDestroyTablePartition(tablePartitionId, table,
false)
- );
- }
-
/**
* Stops all resources associated with a given partition, like replicas
and partition trackers.
*
@@ -2221,57 +2086,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
}
- /**
- * Restarts the table partition including the replica and raft node.
- *
- * @param tablePartitionId Table partition that needs to be restarted.
- * @param revision Metastore revision.
- * @return Operation future.
- */
- // TODO IGNITE-22522 Consider removing, not sure though.
- public CompletableFuture<Void> restartPartition(TablePartitionId
tablePartitionId, long revision, long assignmentsTimestamp) {
- return inBusyLockAsync(busyLock, () ->
tablesVv.get(revision).thenComposeAsync(unused -> inBusyLockAsync(busyLock, ()
-> {
- TableViewInternal table = tables.get(tablePartitionId.tableId());
- assert table != null : tablePartitionId;
-
- return stopPartitionForRestart(tablePartitionId,
table).thenComposeAsync(unused1 -> {
- Assignments stableAssignments =
stableAssignmentsGetLocally(metaStorageMgr, tablePartitionId, revision);
-
- assert stableAssignments != null : "tablePartitionId=" +
tablePartitionId + ", revision=" + revision;
-
- // TODO IGNITE-22522 Consider removing, not sure though.
- return waitForMetadataCompleteness(assignmentsTimestamp);
- }, ioExecutor);
- }), ioExecutor));
- }
-
- /**
- * Restarts the table partition including the replica and raft node.
- *
- * @param tablePartitionId Table partition that needs to be restarted.
- * @param revision Metastore revision.
- * @param assignmentsTimestamp Assignments timestamp.
- * @return Operation future.
- */
- public CompletableFuture<Void> restartPartitionWithCleanUp(
- TablePartitionId tablePartitionId,
- long revision,
- long assignmentsTimestamp
- ) {
- return tableAsync(tablePartitionId.tableId()).thenComposeAsync(table
-> inBusyLockAsync(busyLock, () -> {
- assert table != null : tablePartitionId;
-
- return stopPartitionAndDestroyForRestart(tablePartitionId,
table).thenComposeAsync(unused1 ->
- createPartitionAndStartClient(
- tablePartitionId,
- table,
- assignmentsTimestamp
- ),
- ioExecutor
- );
- }), ioExecutor);
- }
-
@Override
public void setStreamerReceiverRunner(StreamerReceiverRunner runner) {
this.streamerReceiverRunner = runner;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 8bc008ba83a..d0a1f3f66a4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -19,9 +19,7 @@ package org.apache.ignite.internal.table.distributed.disaster;
import static java.util.Collections.emptySet;
import static java.util.concurrent.CompletableFuture.allOf;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableStableAssignments;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneStableAssignments;
-import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.tableState;
import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.zoneState;
import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestType.MULTI_NODE;
import static
org.apache.ignite.internal.table.distributed.disaster.GroupUpdateRequestHandler.getAliveNodesWithData;
@@ -157,17 +155,19 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
?
Arrays.stream(AssignmentUtil.partitionIds(zoneDescriptor.partitions())).boxed().collect(Collectors.toSet())
: partitionIds;
- if (replicationGroupId instanceof TablePartitionId) {
- TablePartitionId groupId = (TablePartitionId) replicationGroupId;
+ assert !(replicationGroupId instanceof TablePartitionId) :
+ "Unexpected type of replication group identifier [class=" +
replicationGroupId.getClass().getSimpleName()
+ + ", value=" + replicationGroupId
+ + ", requiredType = ZonePartitionId].";
- return groupId.tableId() == tableId &&
partitionIdsToCheck.contains(groupId.partitionId());
- } else if (replicationGroupId instanceof ZonePartitionId) {
+ // Besides ZonePartitionId we may also retrieve CmgGroupId or
MetastorageGroupId
+ if (replicationGroupId instanceof ZonePartitionId) {
ZonePartitionId groupId = (ZonePartitionId) replicationGroupId;
return groupId.zoneId() == zoneId &&
partitionIdsToCheck.contains(groupId.partitionId());
+ } else {
+ return false;
}
-
- return false;
}
private CompletableFuture<?> createRestartFuture(
@@ -175,20 +175,16 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
ReplicationGroupId replicationGroupId,
long revision
) {
- if (replicationGroupId instanceof TablePartitionId) {
- return disasterRecoveryManager.tableManager.restartPartition(
- (TablePartitionId) replicationGroupId,
- revision,
- assignmentsTimestamp
- );
- } else if (replicationGroupId instanceof ZonePartitionId) {
- return
disasterRecoveryManager.partitionReplicaLifecycleManager.restartPartition(
- (ZonePartitionId) replicationGroupId,
- revision,
- assignmentsTimestamp
- );
- }
- throw new IllegalStateException("Unexpected replication group id: " +
replicationGroupId);
+ assert replicationGroupId instanceof ZonePartitionId :
+ "Unexpected type of replication group identifier [class=" +
replicationGroupId.getClass().getSimpleName()
+ + ", value=" + replicationGroupId
+ + ", requiredType = ZonePartitionId].";
+
+ return
disasterRecoveryManager.partitionReplicaLifecycleManager.restartPartition(
+ (ZonePartitionId) replicationGroupId,
+ revision,
+ assignmentsTimestamp
+ );
}
private CompletableFuture<?> createCleanupRestartFuture(
@@ -196,20 +192,16 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
ReplicationGroupId replicationGroupId,
long revision
) {
- if (replicationGroupId instanceof TablePartitionId) {
- return
disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
- (TablePartitionId) replicationGroupId,
- revision,
- assignmentsTimestamp
- );
- } else if (replicationGroupId instanceof ZonePartitionId) {
- return
disasterRecoveryManager.partitionReplicaLifecycleManager.restartPartitionWithCleanUp(
- (ZonePartitionId) replicationGroupId,
- revision,
- assignmentsTimestamp
- );
- }
- throw new IllegalStateException("Unexpected replication group id: " +
replicationGroupId);
+ assert replicationGroupId instanceof ZonePartitionId :
+ "Unexpected type of replication group identifier [class=" +
replicationGroupId.getClass().getSimpleName()
+ + ", value=" + replicationGroupId
+ + ", requiredType = ZonePartitionId].";
+
+ return
disasterRecoveryManager.partitionReplicaLifecycleManager.restartPartitionWithCleanUp(
+ (ZonePartitionId) replicationGroupId,
+ revision,
+ assignmentsTimestamp
+ );
}
private CompletableFuture<?> createRestartWithCleanupFuture(
@@ -259,41 +251,26 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
CatalogZoneDescriptor zoneDescriptor,
Catalog catalog
) {
- if (replicationGroupId instanceof TablePartitionId) {
- TablePartitionId tablePartitionId = (TablePartitionId)
replicationGroupId;
-
- return checkPartitionAliveNodes(
- disasterRecoveryManager,
- tablePartitionId,
- zoneDescriptor,
- catalog,
- msRevision,
- tableState(),
- tableStableAssignments(
- disasterRecoveryManager.metaStorageManager,
- tablePartitionId.tableId(),
- new int[]{tablePartitionId.partitionId()}
- )
- );
- } else if (replicationGroupId instanceof ZonePartitionId) {
- ZonePartitionId zonePartitionId = (ZonePartitionId)
replicationGroupId;
-
- return checkPartitionAliveNodes(
- disasterRecoveryManager,
- zonePartitionId,
- zoneDescriptor,
- catalog,
- msRevision,
- zoneState(),
- zoneStableAssignments(
- disasterRecoveryManager.metaStorageManager,
- zonePartitionId.zoneId(),
- new int[]{zonePartitionId.partitionId()}
- )
- );
- } else {
- throw new IllegalArgumentException("Unsupported replication group
type: " + replicationGroupId.getClass());
- }
+ assert replicationGroupId instanceof ZonePartitionId :
+ "Unexpected type of replication group identifier [class=" +
replicationGroupId.getClass().getSimpleName()
+ + ", value=" + replicationGroupId
+ + ", requiredType = ZonePartitionId].";
+
+ ZonePartitionId zonePartitionId = (ZonePartitionId) replicationGroupId;
+
+ return checkPartitionAliveNodes(
+ disasterRecoveryManager,
+ zonePartitionId,
+ zoneDescriptor,
+ catalog,
+ msRevision,
+ zoneState(),
+ zoneStableAssignments(
+ disasterRecoveryManager.metaStorageManager,
+ zonePartitionId.zoneId(),
+ new int[]{zonePartitionId.partitionId()}
+ )
+ );
}
private static <T extends PartitionGroupId> CompletableFuture<Boolean>
checkPartitionAliveNodes(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index be4fc258a68..08bbda3d2e2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -22,8 +22,6 @@ import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTA
import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.BUILD_INDEX_V1;
import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.BUILD_INDEX_V2;
import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.BUILD_INDEX_V3;
-import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.FINISH_TX_V1;
-import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.FINISH_TX_V2;
import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND;
import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.GROUP_TYPE;
import static
org.apache.ignite.internal.partition.replicator.raft.CommandResult.EMPTY_APPLIED_RESULT;
@@ -32,7 +30,6 @@ import static
org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAt
import static
org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAtRwTxBeginTsOrNull;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.PENDING;
-import static
org.apache.ignite.internal.tx.message.TxMessageGroup.VACUUM_TX_STATE_COMMAND;
import java.nio.file.Path;
import java.util.HashSet;
@@ -44,7 +41,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.components.NodeProperties;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -58,23 +54,19 @@ import
org.apache.ignite.internal.partition.replicator.raft.CommandResult;
import
org.apache.ignite.internal.partition.replicator.raft.OnSnapshotSaveHandler;
import
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfo;
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
-import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker;
import
org.apache.ignite.internal.partition.replicator.raft.handlers.AbstractCommandHandler;
import
org.apache.ignite.internal.partition.replicator.raft.handlers.CommandHandlers;
-import
org.apache.ignite.internal.partition.replicator.raft.handlers.FinishTxCommandHandler;
-import
org.apache.ignite.internal.partition.replicator.raft.handlers.VacuumTxStatesCommandHandler;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
-import org.apache.ignite.internal.raft.RaftGroupConfigurationSerializer;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
@@ -88,12 +80,10 @@ import
org.apache.ignite.internal.table.distributed.raft.handlers.MinimumActiveT
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.UpdateCommandResult;
-import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
-import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -119,9 +109,6 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
/** Safe time tracker. */
private final SafeTimeValuesTracker safeTimeTracker;
- /** Storage index tracker. */
- private final PendingComparableValuesTracker<Long, Void>
storageIndexTracker;
-
private final CatalogService catalogService;
private final UUID localNodeId;
@@ -130,8 +117,6 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
private final OnSnapshotSaveHandler onSnapshotSaveHandler;
- private final RaftTxFinishMarker txFinishMarker;
-
// Raft command handlers.
private final CommandHandlers commandHandlers;
@@ -139,14 +124,10 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
private final ClockService clockService;
- private final NodeProperties nodeProperties;
-
/**
* Partition group ID that is actually used for replication.
- *
- * <p>It is a zone partition ID when colocation is enabled, and table
partition ID otherwise.
*/
- private final ReplicationGroupId realReplicationGroupId;
+ private final ZonePartitionId realReplicationGroupId;
private ReplicaMeta lastKnownLease;
@@ -157,7 +138,6 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
StorageUpdateHandler storageUpdateHandler,
TxStatePartitionStorage txStatePartitionStorage,
SafeTimeValuesTracker safeTimeTracker,
- PendingComparableValuesTracker<Long, Void> storageIndexTracker,
CatalogService catalogService,
SchemaRegistry schemaRegistry,
IndexMetaStorage indexMetaStorage,
@@ -166,27 +146,23 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
Executor partitionOperationsExecutor,
LeasePlacementDriver placementDriver,
ClockService clockService,
- NodeProperties nodeProperties,
- ReplicationGroupId realReplicationGroupId
+ ZonePartitionId realReplicationGroupId
) {
this.txManager = txManager;
this.storage = partitionDataStorage;
this.storageUpdateHandler = storageUpdateHandler;
this.txStatePartitionStorage = txStatePartitionStorage;
this.safeTimeTracker = safeTimeTracker;
- this.storageIndexTracker = storageIndexTracker;
this.catalogService = catalogService;
this.localNodeId = localNodeId;
this.placementDriver = placementDriver;
this.clockService = clockService;
- this.nodeProperties = nodeProperties;
this.realReplicationGroupId = realReplicationGroupId;
onSnapshotSaveHandler = new
OnSnapshotSaveHandler(txStatePartitionStorage, partitionOperationsExecutor);
// RAFT command handlers initialization.
TablePartitionId tablePartitionId = new
TablePartitionId(storage.tableId(), storage.partitionId());
- txFinishMarker = new RaftTxFinishMarker(txManager);
CommandHandlers.Builder commandHandlersBuilder = new
CommandHandlers.Builder();
commandHandlersBuilder.addHandler(GROUP_TYPE,
UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND, new MinimumActiveTxTimeCommandHandler(
@@ -205,25 +181,6 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V2,
buildIndexCommandHandler);
commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V3,
buildIndexCommandHandler);
- if (!nodeProperties.colocationEnabled()) {
- commandHandlersBuilder.addHandler(
- GROUP_TYPE,
- FINISH_TX_V1,
- new FinishTxCommandHandler(txStatePartitionStorage,
tablePartitionId, txManager)
- );
-
- commandHandlersBuilder.addHandler(
- GROUP_TYPE,
- FINISH_TX_V2,
- new FinishTxCommandHandler(txStatePartitionStorage,
tablePartitionId, txManager)
- );
-
- commandHandlersBuilder.addHandler(
- TxMessageGroup.GROUP_TYPE,
- VACUUM_TX_STATE_COMMAND,
- new VacuumTxStatesCommandHandler(txStatePartitionStorage));
- }
-
this.commandHandlers = commandHandlersBuilder.build();
RaftGroupConfiguration committedGroupConfiguration =
storage.committedGroupConfiguration();
@@ -348,10 +305,6 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
if (safeTimestamp != null) {
updateTrackerIgnoringTrackerClosedException(safeTimeTracker,
safeTimestamp);
}
-
- if (!nodeProperties.colocationEnabled()) {
-
updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex);
- }
}
return result;
@@ -555,12 +508,6 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
UUID txId = cmd.txId();
- if (!nodeProperties.colocationEnabled()) {
- // When colocation feature is enabled, this object merely serves
as a table processor invoked by zone-aware raft listener,
- // which has already marked the transaction finished.
- txFinishMarker.markFinished(txId, cmd.commit(),
cmd.commitTimestamp(), null);
- }
-
storageUpdateHandler.switchWriteIntents(
txId,
cmd.commit(),
@@ -625,14 +572,6 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
return null;
});
-
- if (!nodeProperties.colocationEnabled()) {
-
updateTrackerIgnoringTrackerClosedException(storageIndexTracker,
config.index());
-
- byte[] configBytes = VersionedSerialization.toBytes(config,
RaftGroupConfigurationSerializer.INSTANCE);
-
-
txStatePartitionStorage.committedGroupConfiguration(configBytes,
lastAppliedIndex, lastAppliedTerm);
- }
} finally {
storage.releasePartitionSnapshotsReadLock();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ebe7f92bb23..586d6a4d610 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -102,7 +102,6 @@ import
org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
@@ -214,8 +213,6 @@ public class InternalTableImpl implements InternalTable {
/** Default read-only transaction timeout. */
private final Supplier<Long> defaultReadTxTimeout;
- private final boolean colocationEnabled;
-
private final TableMetricSource metrics;
/**
@@ -256,7 +253,6 @@ public class InternalTableImpl implements InternalTable {
StreamerReceiverRunner streamerReceiverRunner,
Supplier<Long> defaultRwTxTimeout,
Supplier<Long> defaultReadTxTimeout,
- boolean colocationEnabled,
TableMetricSource metrics
) {
this.tableName = tableName;
@@ -276,7 +272,6 @@ public class InternalTableImpl implements InternalTable {
this.streamerReceiverRunner = streamerReceiverRunner;
this.defaultRwTxTimeout = defaultRwTxTimeout;
this.defaultReadTxTimeout = defaultReadTxTimeout;
- this.colocationEnabled = colocationEnabled;
this.metrics = metrics;
}
@@ -2226,11 +2221,7 @@ public class InternalTableImpl implements InternalTable {
@Override
public final ReplicationGroupId targetReplicationGroupId(int
partitionIndex) {
- if (colocationEnabled) {
- return new ZonePartitionId(zoneId, partitionIndex);
- } else {
- return new TablePartitionId(tableId, partitionIndex);
- }
+ return new ZonePartitionId(zoneId, partitionIndex);
}
private static ZonePartitionIdMessage
serializeReplicationGroupId(ReplicationGroupId replicationGroupId) {
@@ -2297,15 +2288,12 @@ public class InternalTableImpl implements InternalTable
{
*
* @param partitionId Partition ID.
* @param newSafeTimeTracker New partition safe time tracker.
- * @param newStorageIndexTracker New partition storage index tracker.
*/
public void updatePartitionTrackers(
int partitionId,
- PendingComparableValuesTracker<HybridTimestamp, Void>
newSafeTimeTracker,
- PendingComparableValuesTracker<Long, Void> newStorageIndexTracker
+ PendingComparableValuesTracker<HybridTimestamp, Void>
newSafeTimeTracker
) {
PendingComparableValuesTracker<HybridTimestamp, Void>
previousSafeTimeTracker;
- PendingComparableValuesTracker<Long, Void> previousStorageIndexTracker;
synchronized (updatePartitionMapsMux) {
Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp,
Void>> newSafeTimeTrackerMap =
@@ -2316,7 +2304,6 @@ public class InternalTableImpl implements InternalTable {
newStorageIndexTrackerMap.putAll(storageIndexTrackerByPartitionId);
previousSafeTimeTracker = newSafeTimeTrackerMap.put(partitionId,
newSafeTimeTracker);
- previousStorageIndexTracker =
newStorageIndexTrackerMap.put(partitionId, newStorageIndexTracker);
safeTimeTrackerByPartitionId = newSafeTimeTrackerMap;
storageIndexTrackerByPartitionId = newStorageIndexTrackerMap;
@@ -2325,10 +2312,6 @@ public class InternalTableImpl implements InternalTable {
if (previousSafeTimeTracker != null) {
previousSafeTimeTracker.close();
}
-
- if (previousStorageIndexTracker != null) {
- previousStorageIndexTracker.close();
- }
}
private ReplicaRequest upsertAllInternal(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 0c2f18feab9..915c1f761d4 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -22,9 +22,6 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
-import static
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
-import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignments;
import static
org.apache.ignite.internal.partitiondistribution.PendingAssignmentsCalculator.pendingAssignmentsCalculator;
import static
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
@@ -135,8 +132,6 @@ import
org.apache.ignite.internal.raft.service.RaftGroupService;
import
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService;
@@ -159,7 +154,6 @@ import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeColl
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.InjectExecutorService;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
@@ -328,7 +322,6 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
verify(mvTableStorage, timeout(WAIT_TIMEOUT)).destroy();
}
- @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true")
@Test
public void raftListenersAreRecoveredOnRecovery() throws Exception {
DistributionZonesTestUtil.createDefaultZone(catalogManager);
@@ -354,9 +347,6 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
public void testResetPeersRetry() {
createSimpleTable(catalogManager, TABLE_NAME);
- int tableId =
catalogManager.activeCatalog(clock.nowLong()).table(DEFAULT_SCHEMA_NAME,
TABLE_NAME).id();
- TablePartitionId tablePartitionId = new TablePartitionId(tableId, 0);
-
int zoneId =
catalogManager.activeCatalog(clock.nowLong()).table(DEFAULT_SCHEMA_NAME,
TABLE_NAME).zoneId();
ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
@@ -399,19 +389,11 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
}
});
- if (colocationEnabled()) {
- CompletableFuture<Void> putReset = metaStorageManager.put(
-
ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(zonePartitionId),
- assignmentsQueue.toBytes()
- );
- assertThat(putReset, willCompleteSuccessfully());
- } else {
- CompletableFuture<Void> putReset = metaStorageManager.put(
- pendingPartAssignmentsQueueKey(tablePartitionId),
- assignmentsQueue.toBytes()
- );
- assertThat(putReset, willCompleteSuccessfully());
- }
+ CompletableFuture<Void> putReset = metaStorageManager.put(
+
ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(zonePartitionId),
+ assignmentsQueue.toBytes()
+ );
+ assertThat(putReset, willCompleteSuccessfully());
assertThat(assignmentsHandled, willCompleteSuccessfully());
@@ -419,7 +401,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
}
private static void captureSequenceToken(InvocationOnMock invocation,
AtomicLong resetPeersCallCount) {
- long resetSequenceToken = ((Long)
invocation.getArgument(2)).longValue();
+ long resetSequenceToken = invocation.getArgument(2);
resetPeersCallCount.updateAndGet(existing -> Math.max(existing,
resetSequenceToken));
}
@@ -492,7 +474,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
.complete(zonePartitionReplicaListener);
return completedFuture(replica);
- }).when(replicaMgr).startReplica(any(ReplicationGroupId.class), any(),
any(), any(), any(), any(), anyBoolean(), any(), any());
+ }).when(replicaMgr).startReplica(any(ZonePartitionId.class), any(),
any(), any(), any(), any(), anyBoolean(), any(), any());
doReturn(trueCompletedFuture()).when(replicaMgr).stopReplica(any());
doAnswer(invocation -> {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 22e9f60993c..7b5bf6c0c31 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.distributed.raft;
import static java.util.Collections.singletonMap;
-import static
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
@@ -26,8 +25,6 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.deriveUui
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -73,7 +70,6 @@ import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.ClockService;
@@ -81,15 +77,11 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.ClusterService;
-import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
-import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
-import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
-import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
@@ -100,6 +92,7 @@ import
org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import
org.apache.ignite.internal.replicator.command.SafeTimeSyncCommandBuilder;
@@ -116,7 +109,6 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
@@ -136,19 +128,15 @@ import
org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.TxMeta;
-import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.UpdateCommandResult;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
@@ -174,6 +162,8 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
private static final int PARTITION_ID = 0;
+ private static final int ZONE_ID = 2;
+
private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
1,
new Column[]{new Column("key", NativeTypes.INT32, false)},
@@ -308,7 +298,6 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
storageUpdateHandler,
txStatePartitionStorage,
safeTimeTracker,
- new PendingComparableValuesTracker<>(0L),
catalogService,
SCHEMA_REGISTRY,
indexMetaStorage,
@@ -317,8 +306,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
mock(Executor.class),
placementDriver,
clockService,
- new SystemPropertiesNodeProperties(),
- new TablePartitionId(TABLE_ID, PARTITION_ID)
+ new ZonePartitionId(ZONE_ID, PARTITION_ID)
);
// Update(All)Command handling requires both information about raft
group topology and the primary replica,
@@ -420,57 +408,6 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
readAndCheck(false);
}
- @Test
- @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false")
- // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this
test when zone colocation will be the only implementation.
- void testSkipWriteCommandByAppliedIndex() {
- mvPartitionStorage.lastApplied(10L, 1L);
-
- UpdateCommandV2 updateCommand = mock(UpdateCommandV2.class);
- WriteIntentSwitchCommand writeIntentSwitchCommand =
mock(WriteIntentSwitchCommand.class);
- SafeTimeSyncCommand safeTimeSyncCommand =
mock(SafeTimeSyncCommand.class);
- FinishTxCommand finishTxCommand = mock(FinishTxCommand.class);
-
when(finishTxCommand.groupType()).thenReturn(PartitionReplicationMessageGroup.GROUP_TYPE);
- when(finishTxCommand.messageType()).thenReturn(Commands.FINISH_TX_V2);
-
- PrimaryReplicaChangeCommand primaryReplicaChangeCommand =
mock(PrimaryReplicaChangeCommand.class);
-
- // Checks for MvPartitionStorage.
- commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 1, updateCommand,
updateCommandClosureResultCaptor, hybridClock.now()),
- writeCommandCommandClosure(10, 1, updateCommand,
updateCommandClosureResultCaptor, hybridClock.now()),
- writeCommandCommandClosure(4, 1, writeIntentSwitchCommand,
commandClosureResultCaptor, hybridClock.now()),
- writeCommandCommandClosure(5, 1, safeTimeSyncCommand,
commandClosureResultCaptor, hybridClock.now()),
- writeCommandCommandClosure(6, 1, primaryReplicaChangeCommand,
commandClosureResultCaptor, null)
- ).iterator());
-
- // Two storage runConsistently runs are expected: one for
configuration application and another for primaryReplicaChangeCommand
- // handling. Both comes from initial configuration preparation in
@BeforeEach
- verify(mvPartitionStorage,
times(2)).runConsistently(any(WriteClosure.class));
- verify(mvPartitionStorage, times(3)).lastApplied(anyLong(), anyLong());
-
- List<UpdateCommandResult> allValues =
updateCommandClosureResultCaptor.getAllValues();
- assertThat(allValues, containsInAnyOrder(new Throwable[]{null, null}));
- assertThat(commandClosureResultCaptor.getAllValues(),
containsInAnyOrder(new Throwable[]{null, null, null}));
-
- // Checks for TxStateStorage.
- mvPartitionStorage.lastApplied(1L, 1L);
- txStatePartitionStorage.lastApplied(10L, 2L);
-
- commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
-
- commandListener.onWrite(List.of(
- writeCommandCommandClosure(2, 1, finishTxCommand,
commandClosureResultCaptor, hybridClock.now()),
- writeCommandCommandClosure(10, 1, finishTxCommand,
commandClosureResultCaptor, hybridClock.now())
- ).iterator());
-
- verify(txStatePartitionStorage, never())
- .compareAndSet(any(UUID.class), any(TxState.class),
any(TxMeta.class), anyLong(), anyLong());
- verify(txStatePartitionStorage, times(1)).lastApplied(anyLong(),
anyLong());
-
- assertThat(commandClosureResultCaptor.getAllValues(),
containsInAnyOrder(new Throwable[]{null, null}));
- }
-
private CommandClosure<WriteCommand> writeCommandCommandClosure(
long index,
long term,
@@ -541,7 +478,6 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
storageUpdateHandler,
txStatePartitionStorage,
safeTimeTracker,
- new PendingComparableValuesTracker<>(0L),
catalogService,
SCHEMA_REGISTRY,
indexMetaStorage,
@@ -550,8 +486,7 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
executor,
placementDriver,
clockService,
- new SystemPropertiesNodeProperties(),
- new TablePartitionId(TABLE_ID, PARTITION_ID)
+ new ZonePartitionId(ZONE_ID, PARTITION_ID)
);
txStatePartitionStorage.lastApplied(3L, 1L);
@@ -636,26 +571,6 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
verify(mvPartitionStorage).lastApplied(3, 2);
}
- @Test
- @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false")
- // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this
test when zone colocation will be the only implementation.
- void updatesLastAppliedForFinishTxCommands() {
- safeTimeTracker.update(hybridClock.now(), null);
-
- FinishTxCommand command =
PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommandV2()
- .txId(TestTransactionIds.newTransactionId())
- .initiatorTime(hybridClock.now())
- .partitions(List.of())
- .build();
-
- commandListener.onWrite(List.of(
- writeCommandCommandClosure(3, 2, command)
- ).iterator());
-
- assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L));
- assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
- }
-
@Test
void locksOnCommandApplication() {
SafeTimeSyncCommandBuilder safeTimeSyncCommand = new
ReplicaMessagesFactory()
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index 9478ef03177..d73481b09da 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -236,7 +236,6 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- colocationEnabled(),
new TableMetricSource(QualifiedName.fromSimple(TABLE_NAME))
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index f065f652747..bed39739520 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -208,27 +208,20 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
// Let's check the first insert.
PendingComparableValuesTracker<HybridTimestamp, Void> safeTime0 =
mock(PendingComparableValuesTracker.class);
- PendingComparableValuesTracker<Long, Void> storageIndex0 =
mock(PendingComparableValuesTracker.class);
- internalTable.updatePartitionTrackers(0, safeTime0, storageIndex0);
+ internalTable.updatePartitionTrackers(0, safeTime0);
assertSame(safeTime0, internalTable.getPartitionSafeTimeTracker(0));
- assertSame(storageIndex0,
internalTable.getPartitionStorageIndexTracker(0));
-
verify(safeTime0, never()).close();
- verify(storageIndex0, never()).close();
// Let's check the new insert.
PendingComparableValuesTracker<HybridTimestamp, Void> safeTime1 =
mock(PendingComparableValuesTracker.class);
- PendingComparableValuesTracker<Long, Void> storageIndex1 =
mock(PendingComparableValuesTracker.class);
- internalTable.updatePartitionTrackers(0, safeTime1, storageIndex1);
+ internalTable.updatePartitionTrackers(0, safeTime1);
assertSame(safeTime1, internalTable.getPartitionSafeTimeTracker(0));
- assertSame(storageIndex1,
internalTable.getPartitionStorageIndexTracker(0));
verify(safeTime0).close();
- verify(storageIndex0).close();
}
private InternalTableImpl newInternalTable(int tableId, int
partitionCount) {
@@ -251,7 +244,6 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- colocationEnabled(),
new TableMetricSource(QualifiedName.fromSimple("test"))
);
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index e1412e88f86..d9247e862fd 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -692,7 +692,6 @@ public class ItTxTestCluster {
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- colocationEnabled(),
new TableMetricSource(QualifiedName.fromSimple(tableName))
);
@@ -881,67 +880,44 @@ public class ItTxTestCluster {
CatalogService catalogService,
SchemaRegistry schemaRegistry
) {
- if (colocationEnabled()) {
- ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId,
partId);
+ ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, partId);
- var nodeSpecificZonePartitionRaftGroupListeners =
zonePartitionRaftGroupListeners.computeIfAbsent(assignment,
- k -> new HashMap<>());
+ var nodeSpecificZonePartitionRaftGroupListeners =
zonePartitionRaftGroupListeners.computeIfAbsent(assignment,
+ k -> new HashMap<>());
- ZonePartitionRaftListener zonePartitionRaftListener =
nodeSpecificZonePartitionRaftGroupListeners.computeIfAbsent(
- zonePartitionId,
- k -> new ZonePartitionRaftListener(
- zonePartitionId,
- txStateStorage,
- txManagers.get(assignment),
- safeTimeTracker,
- storageIndexTracker,
- mock(PartitionsSnapshots.class,
RETURNS_DEEP_STUBS),
- partitionOperationsExecutor
- )
- );
+ ZonePartitionRaftListener zonePartitionRaftListener =
nodeSpecificZonePartitionRaftGroupListeners.computeIfAbsent(
+ zonePartitionId,
+ k -> new ZonePartitionRaftListener(
+ zonePartitionId,
+ txStateStorage,
+ txManagers.get(assignment),
+ safeTimeTracker,
+ storageIndexTracker,
+ mock(PartitionsSnapshots.class, RETURNS_DEEP_STUBS),
+ partitionOperationsExecutor
+ )
+ );
- PartitionListener tablePartitionRaftListener = new
PartitionListener(
- txManagers.get(assignment),
- partitionDataStorage,
- storageUpdateHandler,
- txStateStorage,
- safeTimeTracker,
- storageIndexTracker,
- catalogService,
- schemaRegistry,
- mock(IndexMetaStorage.class),
-
clusterServices.get(assignment).topologyService().getByConsistentId(assignment).id(),
- mock(MinimumRequiredTimeCollectorService.class),
- partitionOperationsExecutor,
- placementDriver,
- clockServices.get(assignment),
- new SystemPropertiesNodeProperties(),
- zonePartitionId
- );
+ PartitionListener tablePartitionRaftListener = new PartitionListener(
+ txManagers.get(assignment),
+ partitionDataStorage,
+ storageUpdateHandler,
+ txStateStorage,
+ safeTimeTracker,
+ catalogService,
+ schemaRegistry,
+ mock(IndexMetaStorage.class),
+
clusterServices.get(assignment).topologyService().getByConsistentId(assignment).id(),
+ mock(MinimumRequiredTimeCollectorService.class),
+ partitionOperationsExecutor,
+ placementDriver,
+ clockServices.get(assignment),
+ zonePartitionId
+ );
- zonePartitionRaftListener.addTableProcessor(tableId,
tablePartitionRaftListener);
+ zonePartitionRaftListener.addTableProcessor(tableId,
tablePartitionRaftListener);
- return zonePartitionRaftListener;
- } else {
- return new PartitionListener(
- txManagers.get(assignment),
- partitionDataStorage,
- storageUpdateHandler,
- txStateStorage,
- safeTimeTracker,
- storageIndexTracker,
- catalogService,
- schemaRegistry,
- mock(IndexMetaStorage.class),
-
clusterServices.get(assignment).topologyService().getByConsistentId(assignment).id(),
- mock(MinimumRequiredTimeCollectorService.class),
- partitionOperationsExecutor,
- placementDriver,
- clockServices.get(assignment),
- new SystemPropertiesNodeProperties(),
- new TablePartitionId(tableId, partId)
- );
- }
+ return zonePartitionRaftListener;
}
/**
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index acd954069a0..b3f3961c328 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
@@ -311,7 +310,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
mock(StreamerReceiverRunner.class),
() -> 10_000L,
() -> 10_000L,
- colocationEnabled(),
new TableMetricSource(QualifiedName.fromSimple("test"))
);
@@ -536,7 +534,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
storageUpdateHandler,
txStateStorage().getOrCreatePartitionStorage(PART_ID),
safeTime,
- storageIndexTracker,
catalogService,
schemaManager,
mock(IndexMetaStorage.class),
@@ -545,8 +542,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
mock(Executor.class),
placementDriver,
clockService,
- new SystemPropertiesNodeProperties(),
- enabledColocation ? zonePartitionId : tablePartitionId
+ zonePartitionId
);
if (enabledColocation) {
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index 951eee8a736..bba833c7b9c 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.wrapper.Wrapper;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
@@ -669,6 +670,7 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-27268")
void testRestartPartitionsWithCleanUpConcurrentRebalance() throws
Exception {
IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());