This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 876a202f88 IGNITE-22415 Search for table in older versions of catalog
(#3885)
876a202f88 is described below
commit 876a202f887ede527138f30460849396cd31d8b7
Author: Cyrill <[email protected]>
AuthorDate: Mon Jul 8 17:19:39 2024 +0300
IGNITE-22415 Search for table in older versions of catalog (#3885)
---
.../distributionzones/DistributionZoneManager.java | 18 +++----
.../rebalance/DistributionZoneRebalanceEngine.java | 59 ++++++++++++++++------
.../DistributionZoneRebalanceEngineV2.java | 2 +-
.../RebalanceRaftGroupEventsListener.java | 7 ++-
.../distributionzones/rebalance/RebalanceUtil.java | 14 +++++
.../ZoneRebalanceRaftGroupEventsListener.java | 11 ++--
.../DistributionZoneRebalanceEngineTest.java | 22 +++++---
.../RebalanceUtilUpdateAssignmentsTest.java | 15 +++++-
.../index/IndexAvailabilityController.java | 2 +
.../PartitionReplicaLifecycleManager.java | 1 +
.../ignite/internal/rebalance/ItRebalanceTest.java | 57 ++++++++++++++++++++-
.../internal/table/distributed/TableManager.java | 39 ++++++++++----
12 files changed, 195 insertions(+), 52 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 9a42ab8b92..646ed9234d 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -269,10 +269,13 @@ public class DistributionZoneManager implements
IgniteComponent {
restoreGlobalStateFromLocalMetastorage(recoveryRevision);
+ // TODO: IGNITE-22679 CatalogManagerImpl initializes versions in a
separate thread, not safe to make this call directly.
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
return allOf(
- createOrRestoreZonesStates(recoveryRevision),
-
restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision)
- ).thenComposeAsync((notUsed) -> rebalanceEngine.start(),
componentContext.executor());
+ createOrRestoreZonesStates(recoveryRevision,
catalogVersion),
+
restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision,
catalogVersion)
+ ).thenComposeAsync((notUsed) ->
rebalanceEngine.startAsync(catalogVersion), componentContext.executor());
});
}
@@ -1400,9 +1403,7 @@ public class DistributionZoneManager implements
IgniteComponent {
catalogManager.listen(ZONE_ALTER, new
ManagerCatalogAlterZoneEventListener());
}
- private CompletableFuture<Void> createOrRestoreZonesStates(long
recoveryRevision) {
- int catalogVersion = catalogManager.latestCatalogVersion();
-
+ private CompletableFuture<Void> createOrRestoreZonesStates(long
recoveryRevision, int catalogVersion) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
// TODO: IGNITE-20287 Clean up abandoned resources for dropped tables
from vault and metastore
@@ -1420,7 +1421,7 @@ public class DistributionZoneManager implements
IgniteComponent {
* @param recoveryRevision Revision of the Meta Storage after its recovery.
* @return Future that represents the pending completion of the operations.
*/
- private CompletableFuture<Void>
restoreLogicalTopologyChangeEventAndStartTimers(long recoveryRevision) {
+ private CompletableFuture<Void>
restoreLogicalTopologyChangeEventAndStartTimers(long recoveryRevision, int
catalogVersion) {
Entry topologyEntry =
metaStorageManager.getLocally(zonesLogicalTopologyKey(), recoveryRevision);
if (topologyEntry.value() != null) {
@@ -1428,9 +1429,6 @@ public class DistributionZoneManager implements
IgniteComponent {
long topologyRevision = topologyEntry.revision();
- // It is safe to get the latest version of the catalog as we are
in the starting process.
- int catalogVersion = catalogManager.latestCatalogVersion();
-
Entry lastUpdateRevisionEntry =
metaStorageManager.getLocally(zonesRecoverableStateRevision(),
recoveryRevision);
if (lastUpdateRevisionEntry.value() == null || topologyRevision >
bytesToLongKeepingOrder(lastUpdateRevisionEntry.value())) {
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index f8309f7fa0..896252f7c5 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -26,10 +26,12 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseDataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceRaftGroupEventsListener.doStableKeySwitch;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.catalogVersionKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractPartitionNumber;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractZoneId;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.raftConfigurationAppliedKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tablesCounterPrefixKey;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToInt;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -40,6 +42,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -138,7 +141,7 @@ public class DistributionZoneRebalanceEngine {
/**
* Starts the rebalance engine by registering corresponding meta storage
and configuration listeners.
*/
- public CompletableFuture<Void> start() {
+ public CompletableFuture<Void> startAsync(int catalogVersion) {
return IgniteUtils.inBusyLockAsync(busyLock, () -> {
catalogService.listen(ZONE_ALTER, new
CatalogAlterZoneEventListener(catalogService) {
@Override
@@ -159,7 +162,8 @@ public class DistributionZoneRebalanceEngine {
long recoveryRevision = recoveryFinishFuture.join();
- return rebalanceTriggersRecovery(recoveryRevision).thenCompose(v
-> distributionZoneRebalanceEngineV2.start());
+ return rebalanceTriggersRecovery(recoveryRevision, catalogVersion)
+ .thenCompose(v ->
distributionZoneRebalanceEngineV2.startAsync());
});
}
@@ -171,15 +175,15 @@ public class DistributionZoneRebalanceEngine {
// TODO: https://issues.apache.org/jira/browse/IGNITE-21058 At the moment
this method produce many metastore multi-invokes
// TODO: which can be avoided by the local logic, which mirror the logic
of metastore invokes.
// TODO: And then run the remote invoke, only if needed.
- private CompletableFuture<Void> rebalanceTriggersRecovery(long
recoveryRevision) {
+ private CompletableFuture<Void> rebalanceTriggersRecovery(long
recoveryRevision, int catalogVersion) {
if (recoveryRevision > 0) {
- List<CompletableFuture<Void>> zonesRecoveryFutures =
catalogService.zones(catalogService.latestCatalogVersion())
+ List<CompletableFuture<Void>> zonesRecoveryFutures =
catalogService.zones(catalogVersion)
.stream()
.map(zoneDesc ->
recalculateAssignmentsAndScheduleRebalance(
zoneDesc,
recoveryRevision,
- catalogService.latestCatalogVersion()
+ catalogVersion
)
)
.collect(Collectors.toUnmodifiableList());
@@ -216,6 +220,8 @@ public class DistributionZoneRebalanceEngine {
int zoneId =
extractZoneId(evt.entryEvent().newEntry().key(),
DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX);
// It is safe to get the latest version of the catalog as
we are in the metastore thread.
+ // TODO: IGNITE-22661 Potentially unsafe to use the latest
catalog version, as the tables might not already present
+ // in the catalog. Better to store this version when
writing datanodes.
int catalogVersion = catalogService.latestCatalogVersion();
CatalogZoneDescriptor zoneDescriptor =
catalogService.zone(zoneId, catalogVersion);
@@ -274,16 +280,28 @@ public class DistributionZoneRebalanceEngine {
return nullCompletedFuture();
}
- int zoneId =
RebalanceUtil.extractZoneIdFromTablesCounter(event.entryEvent().newEntry().key());
-
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-21254 tables here must be the same
as they were on rebalance start
- List<CatalogTableDescriptor> tables =
findTablesByZoneId(zoneId, catalogService.latestCatalogVersion(),
catalogService);
-
rebalanceScheduler.schedule(() -> {
if (!busyLock.enterBusy()) {
return;
}
+ int zoneId =
RebalanceUtil.extractZoneIdFromTablesCounter(event.entryEvent().newEntry().key());
+
+ int partId =
extractPartitionNumber(event.entryEvent().newEntry().key());
+
+ int catalogVersion;
+
+ try {
+ catalogVersion =
getCatalogVersionForCounter(zoneId, partId, event.revision());
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("Failed to get catalog version for
[zoneId={}, partitionId={}]", e, zoneId, partId);
+
+ busyLock.leaveBusy();
+ return;
+ }
+
+ List<CatalogTableDescriptor> tables =
findTablesByZoneId(zoneId, catalogVersion, catalogService);
+
LOG.debug("Started to update stable keys for tables
from the zone [zoneId = {}, tables = [{}]]",
zoneId,
tables.stream().map(CatalogObjectDescriptor::name).collect(Collectors.toSet())
@@ -292,8 +310,6 @@ public class DistributionZoneRebalanceEngine {
try {
Map<ByteArray, TablePartitionId>
partitionTablesKeys = new HashMap<>();
- int partId =
extractPartitionNumber(event.entryEvent().newEntry().key());
-
for (CatalogTableDescriptor table : tables) {
TablePartitionId replicaGrpId = new
TablePartitionId(table.id(), partId);
partitionTablesKeys.put(raftConfigurationAppliedKey(replicaGrpId),
replicaGrpId);
@@ -334,6 +350,20 @@ public class DistributionZoneRebalanceEngine {
};
}
+ @Deprecated // Will be removed when IGNITE-22115 is merged.
+ private int getCatalogVersionForCounter(int zoneId, int partId, long
revision) throws ExecutionException, InterruptedException {
+ Entry entry = metaStorageManager.getLocally(catalogVersionKey(zoneId,
partId), revision);
+
+ assert entry.value() != null : "Failed to find catalog version for
table counters.";
+
+ int storedCatalogVersion = bytesToInt(entry.value());
+
+ // Wait for the catalog to catch up.
+ catalogService.catalogReadyFuture(storedCatalogVersion).get();
+
+ return storedCatalogVersion;
+ }
+
private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters
parameters) {
return recalculateAssignmentsAndScheduleRebalance(
parameters.zoneDescriptor(),
@@ -345,10 +375,9 @@ public class DistributionZoneRebalanceEngine {
static CompletableFuture<Set<Assignment>> calculateAssignments(
TablePartitionId tablePartitionId,
CatalogService catalogService,
- DistributionZoneManager distributionZoneManager
+ DistributionZoneManager distributionZoneManager,
+ int catalogVersion
) {
- int catalogVersion = catalogService.latestCatalogVersion();
-
CatalogTableDescriptor tableDescriptor =
catalogService.table(tablePartitionId.tableId(), catalogVersion);
CatalogZoneDescriptor zoneDescriptor =
catalogService.zone(tableDescriptor.zoneId(), catalogVersion);
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
index a1b684de29..f409e9b316 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
@@ -96,7 +96,7 @@ public class DistributionZoneRebalanceEngineV2 {
/**
* Starts the rebalance engine by registering corresponding meta storage
and catalog listeners.
*/
- public CompletableFuture<Void> start() {
+ public CompletableFuture<Void> startAsync() {
return IgniteUtils.inBusyLockAsync(busyLock, () -> {
catalogService.listen(ZONE_ALTER, new
CatalogAlterZoneEventListener(catalogService) {
@Override
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index 39373ef464..ed074f5a38 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -407,7 +407,12 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
)
).get();
- Set<Assignment> calculatedAssignments =
calculateAssignments(tablePartitionId, catalogService,
distributionZoneManager).get();
+ // TODO: IGNITE-22661 Potentially unsafe to use the latest catalog
version, as the tables might not already present
+ // in the catalog. Better to take the version from Assignments.
+ int catalogVersion = catalogService.latestCatalogVersion();
+
+ Set<Assignment> calculatedAssignments =
+ calculateAssignments(tablePartitionId, catalogService,
distributionZoneManager, catalogVersion).get();
Entry stableEntry = values.get(stablePartAssignmentsKey);
Entry pendingEntry = values.get(pendingPartAssignmentsKey);
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 3d77134d01..0bf3ecff4a 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -327,6 +327,9 @@ public class RebalanceUtil {
/** Key prefix for counter of rebalances of tables from a zone that are
associated with the specified partition. */
private static final String TABLES_COUNTER_PREFIX = "tables.counter.";
+ /** Key prefix for catalog version of tables at the time of setting the
counter. */
+ private static final String TABLE_CATALOG_PREFIX =
"tables.catalog.version.";
+
/** Key prefix for a raft configuration that was applied during rebalance
of the specified partition form a table. */
private static final String RAFT_CONF_APPLIED_PREFIX =
"assignments.raft.conf.applied.";
@@ -418,6 +421,17 @@ public class RebalanceUtil {
return new ByteArray(TABLES_COUNTER_PREFIX + zoneId + "_part_" +
partId);
}
+ /**
+ * ByteArray key for catalog version of a table .
+ *
+ * @param zoneId Identifier of a zone.
+ * @param partId Unique identifier of a partition.
+ * @return Key for a partition.
+ */
+ public static ByteArray catalogVersionKey(int zoneId, int partId) {
+ return new ByteArray(TABLE_CATALOG_PREFIX + zoneId + "_part_" +
partId);
+ }
+
/**
* ByteArray prefix for counter of rebalances of tables from a zone that
are associated with the specified partition.
*
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index 951101a3c7..2ae5ddc0d0 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -331,10 +331,14 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
)
).get();
+ // TODO: IGNITE-22680 Find a better way to retrieve the catalog
version.
+ int catalogVersion = catalogService.latestCatalogVersion();
+
Set<Assignment> calculatedAssignments = calculateZoneAssignments(
zonePartitionId,
catalogService,
- distributionZoneManager
+ distributionZoneManager,
+ catalogVersion
).get();
Entry stableEntry = values.get(stablePartAssignmentsKey);
@@ -616,10 +620,9 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
private static CompletableFuture<Set<Assignment>> calculateZoneAssignments(
ZonePartitionId zonePartitionId,
CatalogService catalogService,
- DistributionZoneManager distributionZoneManager
+ DistributionZoneManager distributionZoneManager,
+ int catalogVersion
) {
- int catalogVersion = catalogService.latestCatalogVersion();
-
CatalogZoneDescriptor zoneDescriptor =
catalogService.zone(zonePartitionId.zoneId(), catalogVersion);
int zoneId = zonePartitionId.zoneId();
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index cc301bce76..1fd8f549b6 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -189,9 +189,11 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
keyValueStorage = spy(new SimpleInMemoryKeyValueStorage(nodeName));
+ ClusterTimeImpl clusterTime = new ClusterTimeImpl("node", new
IgniteSpinBusyLock(), clock);
+
MetaStorageListener metaStorageListener = new MetaStorageListener(
keyValueStorage,
- mock(ClusterTimeImpl.class),
+ clusterTime,
raftConfiguration.retryTimeout(),
completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
@@ -249,7 +251,11 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
lenient().doAnswer(invocationClose -> {
Iif iif = invocationClose.getArgument(0);
- MultiInvokeCommand multiInvokeCommand =
commandsFactory.multiInvokeCommand().iif(iif).id(commandIdGenerator.newId()).build();
+ MultiInvokeCommand multiInvokeCommand =
commandsFactory.multiInvokeCommand()
+ .iif(iif)
+ .id(commandIdGenerator.newId())
+ .initiatorTimeLong(clusterTime.nowLong())
+ .build();
return metaStorageService.run(multiInvokeCommand);
}).when(metaStorageManager).invoke(any());
@@ -301,7 +307,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
createRebalanceEngine();
- rebalanceEngine.start();
+ rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
Set<String> nodes = Set.of("node0", "node1", "node2");
@@ -324,7 +330,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
createRebalanceEngine();
- rebalanceEngine.start();
+ rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
Set<String> nodes = Set.of("node0", "node1", "node2");
@@ -358,7 +364,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
createRebalanceEngine();
- rebalanceEngine.start();
+ rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
int zoneId = getZoneId(ZONE_NAME_0);
@@ -394,7 +400,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
createRebalanceEngine();
- rebalanceEngine.start();
+ rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
Set<String> nodes = Set.of("node0", "node1", "node2");
@@ -443,7 +449,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
try {
createRebalanceEngine(realMetaStorageManager);
- rebalanceEngine.start();
+ rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
alterZone(ZONE_NAME_0, 2);
@@ -475,7 +481,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
try {
createRebalanceEngine(realMetaStorageManager);
- rebalanceEngine.start();
+ rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
alterZone(getDefaultZone(catalogManager, clock.nowLong()).name(),
2);
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index af146edfcc..ad0e0a8fef 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -46,6 +46,8 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescript
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -68,6 +70,7 @@ import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.sql.ColumnType;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -104,6 +107,8 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
DEFAULT_STORAGE_PROFILE
);
+ private final HybridClock clock = new HybridClockImpl();
+
@InjectConfiguration
private RaftConfiguration raftConfiguration;
@@ -130,9 +135,11 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
+ ClusterTimeImpl clusterTime = new ClusterTimeImpl("node", new
IgniteSpinBusyLock(), clock);
+
MetaStorageListener metaStorageListener = new MetaStorageListener(
keyValueStorage,
- mock(ClusterTimeImpl.class),
+ clusterTime,
raftConfiguration.retryTimeout(),
completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
@@ -190,7 +197,11 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
lenient().doAnswer(invocationClose -> {
Iif iif = invocationClose.getArgument(0);
- MultiInvokeCommand multiInvokeCommand =
commandsFactory.multiInvokeCommand().iif(iif).id(commandIdGenerator.newId()).build();
+ MultiInvokeCommand multiInvokeCommand =
commandsFactory.multiInvokeCommand()
+ .iif(iif)
+ .id(commandIdGenerator.newId())
+ .initiatorTimeLong(clusterTime.nowLong())
+ .build();
return metaStorageService.run(multiInvokeCommand);
}).when(metaStorageManager).invoke(any());
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
index 5989c9de08..17c68308b7 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
@@ -161,6 +161,8 @@ class IndexAvailabilityController implements
ManuallyCloseable {
public void recover(long recoveryRevision) {
inBusyLock(busyLock, () -> {
// It is expected that the method will only be called on recovery,
when the deploy of metastore watches has not yet occurred.
+ // TODO: IGNITE-22656 Potentially dangerous to take the latest
version as the tables and indexes might no longer present
+ // in the catalog.
int catalogVersion = catalogManager.latestCatalogVersion();
List<CompletableFuture<?>> futures =
catalogManager.indexes(catalogVersion).stream()
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index c55c5d0410..8f7cd70732 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -526,6 +526,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
ZonePartitionId replicaGrpId = new ZonePartitionId(zoneId,
partitionId);
// It is safe to get the latest version of the catalog as
we are in the metastore thread.
+ // TODO: IGNITE-22661 Potentially unsafe to use the latest
catalog version. Better to take the version from Assignments.
int catalogVersion = catalogMgr.latestCatalogVersion();
CatalogZoneDescriptor zoneDescriptor =
catalogMgr.zone(zoneId, catalogVersion);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 2b25d6a5b6..992dd604be 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -33,6 +33,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -57,6 +58,7 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -210,6 +212,44 @@ public class ItRebalanceTest extends
BaseIgniteAbstractTest {
waitForTablesCounterInMetastore(0, zoneId, 2);
}
+ @Test
+ void testRebalanceTablesCounterForZonePrevCatalogVersion() throws
Exception {
+ cluster.startAndInit(3);
+
+ String zoneName = "ZONE";
+
+ createZone(zoneName, 3, 3);
+
+ Set<Integer> tableIds = new HashSet<>();
+
+ tableIds.add(createTestTable("TEST1", zoneName));
+
+ Set<String> allNodes =
cluster.runningNodes().map(IgniteImpl::name).collect(Collectors.toSet());
+
+ for (Integer tableId : tableIds) {
+ waitForStableAssignmentsInMetastore(allNodes, tableId);
+ }
+
+ // Block low watermark change with an open ro tx.
+ cluster.aliveNode().transactions().begin(new
TransactionOptions().readOnly(true));
+
+ alterZone(zoneName, 2);
+
+ dropTestTable("TEST1");
+
+ CatalogManager catalogManager = cluster.aliveNode().catalogManager();
+
+ int zoneId =
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneName).id();
+
+ for (Integer tableId : tableIds) {
+ waitForStableAssignmentsInMetastore(2, tableId);
+ }
+
+ waitForTablesCounterInMetastore(0, zoneId, 0);
+ waitForTablesCounterInMetastore(0, zoneId, 1);
+ waitForTablesCounterInMetastore(0, zoneId, 2);
+ }
+
private static Row marshalTuple(TableViewInternal table, Tuple tuple) {
SchemaRegistry schemaReg = table.schemaView();
var marshaller = new TupleMarshallerImpl(schemaReg.lastKnownSchema());
@@ -246,7 +286,7 @@ public class ItRebalanceTest extends BaseIgniteAbstractTest
{
lastAssignmentsHolderForLog[0] = assignments;
return assignments.size() == expectedNodesNumber;
- }, 30000), "Expected nodes: " + expectedNodesNumber + ", actual nodes
size: " + lastAssignmentsHolderForLog[0].size());
+ }, 30000), "Expected nodes: " + expectedNodesNumber + ", actual nodes
size: " + actualSize(lastAssignmentsHolderForLog));
}
private void waitForTablesCounterInMetastore(int expectedTablesNumber, int
zoneId, int partitionNumber) throws InterruptedException {
@@ -259,7 +299,12 @@ public class ItRebalanceTest extends
BaseIgniteAbstractTest {
return tablesCounter != null && tablesCounter.size() ==
expectedTablesNumber;
- }, 30000), "Expected tables number: " + expectedTablesNumber + ",
actual tables number: " + lastAssignmentsHolderForLog[0].size());
+ }, 30000),
+ "Expected tables number: " + expectedTablesNumber + ", actual
tables number: " + actualSize(lastAssignmentsHolderForLog));
+ }
+
+ private static String actualSize(Collection<?>[] collection) {
+ return collection[0] == null ? "null" :
Integer.toString(collection[0].size());
}
private static CompletableFuture<Set<Integer>> tablesCounter(
@@ -313,4 +358,12 @@ public class ItRebalanceTest extends
BaseIgniteAbstractTest {
.filter(t -> t.name().equals(tableName))
.findFirst().get().id();
}
+
+ private void dropTestTable(String tableName) {
+ String sql2 = "drop table if exists " + tableName;
+
+ cluster.doInSession(1, session -> {
+ executeUpdate(sql2, session);
+ });
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index c2e6b230e4..13406d7f77 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -34,6 +34,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.catalogVersionKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractPartitionNumber;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTableId;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.intersect;
@@ -57,6 +58,7 @@ import static
org.apache.ignite.internal.table.distributed.TableUtils.droppedTab
import static
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTable;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
+import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
@@ -1787,14 +1789,18 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
stringKey, partId, table.name(),
localNode().address(), pendingAssignments, revision);
}
- return
setTablesPartitionCountersForRebalance(replicaGrpId, revision,
pendingAssignments.force())
+ // TODO: IGNITE-22661 should come from the
assignments. The version valid at the time of assignment creation.
+ int catalogVersion =
catalogService.latestCatalogVersion();
+
+ return
setTablesPartitionCountersForRebalance(replicaGrpId, revision,
pendingAssignments.force(), catalogVersion)
.thenCompose(r ->
handleChangePendingAssignmentEvent(
replicaGrpId,
table,
stableAssignments,
pendingAssignments,
revision,
- isRecovery
+ isRecovery,
+ catalogVersion
))
.thenCompose(v ->
changePeersOnRebalance(table, replicaGrpId, pendingAssignments.nodes(),
revision));
} finally {
@@ -1810,7 +1816,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
@Nullable Assignments stableAssignments,
Assignments pendingAssignments,
long revision,
- boolean isRecovery
+ boolean isRecovery,
+ int catalogVersion
) {
boolean pendingAssignmentsAreForced = pendingAssignments.force();
Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
@@ -1866,7 +1873,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
);
}
- int zoneId = getTableDescriptor(tbl.tableId(),
catalogService.latestCatalogVersion()).zoneId();
+ int zoneId = getTableDescriptor(tbl.tableId(),
catalogVersion).zoneId();
return startPartitionAndStartClient(
tbl,
@@ -1899,9 +1906,12 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}, ioExecutor);
}
- private CompletableFuture<Void>
setTablesPartitionCountersForRebalance(TablePartitionId replicaGrpId, long
revision, boolean force) {
- int catalogVersion = catalogService.latestCatalogVersion();
-
+ private CompletableFuture<Void> setTablesPartitionCountersForRebalance(
+ TablePartitionId replicaGrpId,
+ long revision,
+ boolean force,
+ int catalogVersion
+ ) {
int tableId = replicaGrpId.tableId();
CatalogZoneDescriptor zoneDescriptor =
getZoneDescriptor(getTableDescriptor(tableId, catalogVersion), catalogVersion);
@@ -1924,9 +1934,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
byte[] countersValue = toBytes(tablesInZone);
+ // The collected tables are valid for the current catalog version but
may be removed in future versions.
+ // Therefore, we need to store the `catalogVersion` alongside the
counter to ensure we read the correct catalog version later.
return metaStorageMgr.invoke(iif(
condition,
- ops(put(tablesCounterKey(zoneId, partId),
countersValue)).yield(true),
+ ops(
+ put(tablesCounterKey(zoneId, partId), countersValue),
+ put(catalogVersionKey(zoneId, partId),
intToBytes(catalogVersion))
+ ).yield(true),
ops().yield(false)
)).whenComplete((res, e) -> {
if (e != null) {
@@ -2070,6 +2085,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
TablePartitionId replicaGrpId = new
TablePartitionId(tableId, partitionId);
// It is safe to get the latest version of the catalog as
we are in the metastore thread.
+ // TODO: IGNITE-22661 Potentially unsafe to use the latest
catalog version, as the tables might not already present
+ // in the catalog. Better to take the version from
Assignments.
int catalogVersion = catalogService.latestCatalogVersion();
return tablesById(evt.revision())
@@ -2543,7 +2560,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
assert stableAssignments != null : "tablePartitionId=" +
tablePartitionId + ", revision=" + revision;
- int zoneId = getTableDescriptor(tablePartitionId.tableId(),
catalogService.latestCatalogVersion()).zoneId();
+ // TODO: IGNITE-22661 Potentially unsafe to use the latest
catalog version, as the tables might not already present
+ // in the catalog. Better to store this version in
ManualGroupRestartRequest.
+ int catalogVersion = catalogService.latestCatalogVersion();
+
+ int zoneId = getTableDescriptor(tablePartitionId.tableId(),
catalogVersion).zoneId();
return startPartitionAndStartClient(
table,