This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 876a202f88 IGNITE-22415 Search for table in older versions of catalog 
(#3885)
876a202f88 is described below

commit 876a202f887ede527138f30460849396cd31d8b7
Author: Cyrill <[email protected]>
AuthorDate: Mon Jul 8 17:19:39 2024 +0300

    IGNITE-22415 Search for table in older versions of catalog (#3885)
---
 .../distributionzones/DistributionZoneManager.java | 18 +++----
 .../rebalance/DistributionZoneRebalanceEngine.java | 59 ++++++++++++++++------
 .../DistributionZoneRebalanceEngineV2.java         |  2 +-
 .../RebalanceRaftGroupEventsListener.java          |  7 ++-
 .../distributionzones/rebalance/RebalanceUtil.java | 14 +++++
 .../ZoneRebalanceRaftGroupEventsListener.java      | 11 ++--
 .../DistributionZoneRebalanceEngineTest.java       | 22 +++++---
 .../RebalanceUtilUpdateAssignmentsTest.java        | 15 +++++-
 .../index/IndexAvailabilityController.java         |  2 +
 .../PartitionReplicaLifecycleManager.java          |  1 +
 .../ignite/internal/rebalance/ItRebalanceTest.java | 57 ++++++++++++++++++++-
 .../internal/table/distributed/TableManager.java   | 39 ++++++++++----
 12 files changed, 195 insertions(+), 52 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 9a42ab8b92..646ed9234d 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -269,10 +269,13 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             restoreGlobalStateFromLocalMetastorage(recoveryRevision);
 
+            // TODO: IGNITE-22679 CatalogManagerImpl initializes versions in a 
separate thread, not safe to make this call directly.
+            int catalogVersion = catalogManager.latestCatalogVersion();
+
             return allOf(
-                    createOrRestoreZonesStates(recoveryRevision),
-                    
restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision)
-            ).thenComposeAsync((notUsed) -> rebalanceEngine.start(), 
componentContext.executor());
+                    createOrRestoreZonesStates(recoveryRevision, 
catalogVersion),
+                    
restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision, 
catalogVersion)
+            ).thenComposeAsync((notUsed) -> 
rebalanceEngine.startAsync(catalogVersion), componentContext.executor());
         });
     }
 
@@ -1400,9 +1403,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
         catalogManager.listen(ZONE_ALTER, new 
ManagerCatalogAlterZoneEventListener());
     }
 
-    private CompletableFuture<Void> createOrRestoreZonesStates(long 
recoveryRevision) {
-        int catalogVersion = catalogManager.latestCatalogVersion();
-
+    private CompletableFuture<Void> createOrRestoreZonesStates(long 
recoveryRevision, int catalogVersion) {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
         // TODO: IGNITE-20287 Clean up abandoned resources for dropped tables 
from vault and metastore
@@ -1420,7 +1421,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
      * @param recoveryRevision Revision of the Meta Storage after its recovery.
      * @return Future that represents the pending completion of the operations.
      */
-    private CompletableFuture<Void> 
restoreLogicalTopologyChangeEventAndStartTimers(long recoveryRevision) {
+    private CompletableFuture<Void> 
restoreLogicalTopologyChangeEventAndStartTimers(long recoveryRevision, int 
catalogVersion) {
         Entry topologyEntry = 
metaStorageManager.getLocally(zonesLogicalTopologyKey(), recoveryRevision);
 
         if (topologyEntry.value() != null) {
@@ -1428,9 +1429,6 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             long topologyRevision = topologyEntry.revision();
 
-            // It is safe to get the latest version of the catalog as we are 
in the starting process.
-            int catalogVersion = catalogManager.latestCatalogVersion();
-
             Entry lastUpdateRevisionEntry = 
metaStorageManager.getLocally(zonesRecoverableStateRevision(), 
recoveryRevision);
 
             if (lastUpdateRevisionEntry.value() == null || topologyRevision > 
bytesToLongKeepingOrder(lastUpdateRevisionEntry.value())) {
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index f8309f7fa0..896252f7c5 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -26,10 +26,12 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseDataNodes;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceRaftGroupEventsListener.doStableKeySwitch;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.catalogVersionKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractPartitionNumber;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractZoneId;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.raftConfigurationAppliedKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tablesCounterPrefixKey;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToInt;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
@@ -40,6 +42,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -138,7 +141,7 @@ public class DistributionZoneRebalanceEngine {
     /**
      * Starts the rebalance engine by registering corresponding meta storage 
and configuration listeners.
      */
-    public CompletableFuture<Void> start() {
+    public CompletableFuture<Void> startAsync(int catalogVersion) {
         return IgniteUtils.inBusyLockAsync(busyLock, () -> {
             catalogService.listen(ZONE_ALTER, new 
CatalogAlterZoneEventListener(catalogService) {
                 @Override
@@ -159,7 +162,8 @@ public class DistributionZoneRebalanceEngine {
 
             long recoveryRevision = recoveryFinishFuture.join();
 
-            return rebalanceTriggersRecovery(recoveryRevision).thenCompose(v 
-> distributionZoneRebalanceEngineV2.start());
+            return rebalanceTriggersRecovery(recoveryRevision, catalogVersion)
+                    .thenCompose(v -> 
distributionZoneRebalanceEngineV2.startAsync());
         });
     }
 
@@ -171,15 +175,15 @@ public class DistributionZoneRebalanceEngine {
     // TODO: https://issues.apache.org/jira/browse/IGNITE-21058 At the moment 
this method produce many metastore multi-invokes
     // TODO: which can be avoided by the local logic, which mirror the logic 
of metastore invokes.
     // TODO: And then run the remote invoke, only if needed.
-    private CompletableFuture<Void> rebalanceTriggersRecovery(long 
recoveryRevision) {
+    private CompletableFuture<Void> rebalanceTriggersRecovery(long 
recoveryRevision, int catalogVersion) {
         if (recoveryRevision > 0) {
-            List<CompletableFuture<Void>> zonesRecoveryFutures = 
catalogService.zones(catalogService.latestCatalogVersion())
+            List<CompletableFuture<Void>> zonesRecoveryFutures = 
catalogService.zones(catalogVersion)
                     .stream()
                     .map(zoneDesc ->
                             recalculateAssignmentsAndScheduleRebalance(
                                     zoneDesc,
                                     recoveryRevision,
-                                    catalogService.latestCatalogVersion()
+                                    catalogVersion
                             )
                     )
                     .collect(Collectors.toUnmodifiableList());
@@ -216,6 +220,8 @@ public class DistributionZoneRebalanceEngine {
                     int zoneId = 
extractZoneId(evt.entryEvent().newEntry().key(), 
DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX);
 
                     // It is safe to get the latest version of the catalog as 
we are in the metastore thread.
+                    // TODO: IGNITE-22661 Potentially unsafe to use the latest 
catalog version, as the tables might not already present
+                    //  in the catalog. Better to store this version when 
writing datanodes.
                     int catalogVersion = catalogService.latestCatalogVersion();
 
                     CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(zoneId, catalogVersion);
@@ -274,16 +280,28 @@ public class DistributionZoneRebalanceEngine {
                         return nullCompletedFuture();
                     }
 
-                    int zoneId = 
RebalanceUtil.extractZoneIdFromTablesCounter(event.entryEvent().newEntry().key());
-
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-21254 tables here must be the same 
as they were on rebalance start
-                    List<CatalogTableDescriptor> tables = 
findTablesByZoneId(zoneId, catalogService.latestCatalogVersion(), 
catalogService);
-
                     rebalanceScheduler.schedule(() -> {
                         if (!busyLock.enterBusy()) {
                             return;
                         }
 
+                        int zoneId = 
RebalanceUtil.extractZoneIdFromTablesCounter(event.entryEvent().newEntry().key());
+
+                        int partId = 
extractPartitionNumber(event.entryEvent().newEntry().key());
+
+                        int catalogVersion;
+
+                        try {
+                            catalogVersion = 
getCatalogVersionForCounter(zoneId, partId, event.revision());
+                        } catch (ExecutionException | InterruptedException e) {
+                            LOG.error("Failed to get catalog version for 
[zoneId={}, partitionId={}]", e, zoneId, partId);
+
+                            busyLock.leaveBusy();
+                            return;
+                        }
+
+                        List<CatalogTableDescriptor> tables = 
findTablesByZoneId(zoneId, catalogVersion, catalogService);
+
                         LOG.debug("Started to update stable keys for tables 
from the zone [zoneId = {}, tables = [{}]]",
                                 zoneId,
                                 
tables.stream().map(CatalogObjectDescriptor::name).collect(Collectors.toSet())
@@ -292,8 +310,6 @@ public class DistributionZoneRebalanceEngine {
                         try {
                             Map<ByteArray, TablePartitionId> 
partitionTablesKeys = new HashMap<>();
 
-                            int partId = 
extractPartitionNumber(event.entryEvent().newEntry().key());
-
                             for (CatalogTableDescriptor table : tables) {
                                 TablePartitionId replicaGrpId = new 
TablePartitionId(table.id(), partId);
                                 
partitionTablesKeys.put(raftConfigurationAppliedKey(replicaGrpId), 
replicaGrpId);
@@ -334,6 +350,20 @@ public class DistributionZoneRebalanceEngine {
         };
     }
 
+    @Deprecated // Will be removed when IGNITE-22115 is merged.
+    private int getCatalogVersionForCounter(int zoneId, int partId, long 
revision) throws ExecutionException, InterruptedException {
+        Entry entry = metaStorageManager.getLocally(catalogVersionKey(zoneId, 
partId), revision);
+
+        assert entry.value() != null : "Failed to find catalog version for 
table counters.";
+
+        int storedCatalogVersion = bytesToInt(entry.value());
+
+        // Wait for the catalog to catch up.
+        catalogService.catalogReadyFuture(storedCatalogVersion).get();
+
+        return storedCatalogVersion;
+    }
+
     private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters 
parameters) {
         return recalculateAssignmentsAndScheduleRebalance(
                 parameters.zoneDescriptor(),
@@ -345,10 +375,9 @@ public class DistributionZoneRebalanceEngine {
     static CompletableFuture<Set<Assignment>> calculateAssignments(
             TablePartitionId tablePartitionId,
             CatalogService catalogService,
-            DistributionZoneManager distributionZoneManager
+            DistributionZoneManager distributionZoneManager,
+            int catalogVersion
     ) {
-        int catalogVersion = catalogService.latestCatalogVersion();
-
         CatalogTableDescriptor tableDescriptor = 
catalogService.table(tablePartitionId.tableId(), catalogVersion);
 
         CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(tableDescriptor.zoneId(), catalogVersion);
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
index a1b684de29..f409e9b316 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
@@ -96,7 +96,7 @@ public class DistributionZoneRebalanceEngineV2 {
     /**
      * Starts the rebalance engine by registering corresponding meta storage 
and catalog listeners.
      */
-    public CompletableFuture<Void> start() {
+    public CompletableFuture<Void> startAsync() {
         return IgniteUtils.inBusyLockAsync(busyLock, () -> {
             catalogService.listen(ZONE_ALTER, new 
CatalogAlterZoneEventListener(catalogService) {
                 @Override
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index 39373ef464..ed074f5a38 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -407,7 +407,12 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                     )
             ).get();
 
-            Set<Assignment> calculatedAssignments = 
calculateAssignments(tablePartitionId, catalogService, 
distributionZoneManager).get();
+            // TODO: IGNITE-22661 Potentially unsafe to use the latest catalog 
version, as the tables might not already present
+            //  in the catalog. Better to take the version from Assignments.
+            int catalogVersion = catalogService.latestCatalogVersion();
+
+            Set<Assignment> calculatedAssignments =
+                    calculateAssignments(tablePartitionId, catalogService, 
distributionZoneManager, catalogVersion).get();
 
             Entry stableEntry = values.get(stablePartAssignmentsKey);
             Entry pendingEntry = values.get(pendingPartAssignmentsKey);
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 3d77134d01..0bf3ecff4a 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -327,6 +327,9 @@ public class RebalanceUtil {
     /** Key prefix for counter of rebalances of tables from a zone that are 
associated with the specified partition. */
     private static final String TABLES_COUNTER_PREFIX = "tables.counter.";
 
+    /** Key prefix for catalog version of tables at the time of setting the 
counter. */
+    private static final String TABLE_CATALOG_PREFIX = 
"tables.catalog.version.";
+
     /** Key prefix for a raft configuration that was applied during rebalance 
of the specified partition form a table. */
     private static final String RAFT_CONF_APPLIED_PREFIX = 
"assignments.raft.conf.applied.";
 
@@ -418,6 +421,17 @@ public class RebalanceUtil {
         return new ByteArray(TABLES_COUNTER_PREFIX + zoneId + "_part_" + 
partId);
     }
 
+    /**
+     * ByteArray key for catalog version of a table .
+     *
+     * @param zoneId Identifier of a zone.
+     * @param partId Unique identifier of a partition.
+     * @return Key for a partition.
+     */
+    public static ByteArray catalogVersionKey(int zoneId, int partId) {
+        return new ByteArray(TABLE_CATALOG_PREFIX + zoneId + "_part_" + 
partId);
+    }
+
     /**
      * ByteArray prefix for counter of rebalances of tables from a zone that 
are associated with the specified partition.
      *
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index 951101a3c7..2ae5ddc0d0 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -331,10 +331,14 @@ public class ZoneRebalanceRaftGroupEventsListener 
implements RaftGroupEventsList
                     )
             ).get();
 
+            // TODO: IGNITE-22680 Find a better way to retrieve the catalog 
version.
+            int catalogVersion = catalogService.latestCatalogVersion();
+
             Set<Assignment> calculatedAssignments = calculateZoneAssignments(
                     zonePartitionId,
                     catalogService,
-                    distributionZoneManager
+                    distributionZoneManager,
+                    catalogVersion
             ).get();
 
             Entry stableEntry = values.get(stablePartAssignmentsKey);
@@ -616,10 +620,9 @@ public class ZoneRebalanceRaftGroupEventsListener 
implements RaftGroupEventsList
     private static CompletableFuture<Set<Assignment>> calculateZoneAssignments(
             ZonePartitionId zonePartitionId,
             CatalogService catalogService,
-            DistributionZoneManager distributionZoneManager
+            DistributionZoneManager distributionZoneManager,
+            int catalogVersion
     ) {
-        int catalogVersion = catalogService.latestCatalogVersion();
-
         CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(zonePartitionId.zoneId(), catalogVersion);
 
         int zoneId = zonePartitionId.zoneId();
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index cc301bce76..1fd8f549b6 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -189,9 +189,11 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage(nodeName));
 
+        ClusterTimeImpl clusterTime = new ClusterTimeImpl("node", new 
IgniteSpinBusyLock(), clock);
+
         MetaStorageListener metaStorageListener = new MetaStorageListener(
                 keyValueStorage,
-                mock(ClusterTimeImpl.class),
+                clusterTime,
                 raftConfiguration.retryTimeout(),
                 completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
         );
@@ -249,7 +251,11 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
         lenient().doAnswer(invocationClose -> {
             Iif iif = invocationClose.getArgument(0);
 
-            MultiInvokeCommand multiInvokeCommand = 
commandsFactory.multiInvokeCommand().iif(iif).id(commandIdGenerator.newId()).build();
+            MultiInvokeCommand multiInvokeCommand = 
commandsFactory.multiInvokeCommand()
+                    .iif(iif)
+                    .id(commandIdGenerator.newId())
+                    .initiatorTimeLong(clusterTime.nowLong())
+                    .build();
 
             return metaStorageService.run(multiInvokeCommand);
         }).when(metaStorageManager).invoke(any());
@@ -301,7 +307,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         createRebalanceEngine();
 
-        rebalanceEngine.start();
+        rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
 
         Set<String> nodes = Set.of("node0", "node1", "node2");
 
@@ -324,7 +330,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         createRebalanceEngine();
 
-        rebalanceEngine.start();
+        rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
 
         Set<String> nodes = Set.of("node0", "node1", "node2");
 
@@ -358,7 +364,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         createRebalanceEngine();
 
-        rebalanceEngine.start();
+        rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
 
         int zoneId = getZoneId(ZONE_NAME_0);
 
@@ -394,7 +400,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         createRebalanceEngine();
 
-        rebalanceEngine.start();
+        rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
 
         Set<String> nodes = Set.of("node0", "node1", "node2");
 
@@ -443,7 +449,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
         try {
             createRebalanceEngine(realMetaStorageManager);
 
-            rebalanceEngine.start();
+            rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
 
             alterZone(ZONE_NAME_0, 2);
 
@@ -475,7 +481,7 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
         try {
             createRebalanceEngine(realMetaStorageManager);
 
-            rebalanceEngine.start();
+            rebalanceEngine.startAsync(catalogManager.latestCatalogVersion());
 
             alterZone(getDefaultZone(catalogManager, clock.nowLong()).name(), 
2);
 
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index af146edfcc..ad0e0a8fef 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -46,6 +46,8 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescript
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -68,6 +70,7 @@ import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.sql.ColumnType;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -104,6 +107,8 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
             DEFAULT_STORAGE_PROFILE
     );
 
+    private final HybridClock clock = new HybridClockImpl();
+
     @InjectConfiguration
     private RaftConfiguration raftConfiguration;
 
@@ -130,9 +135,11 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
 
+        ClusterTimeImpl clusterTime = new ClusterTimeImpl("node", new 
IgniteSpinBusyLock(), clock);
+
         MetaStorageListener metaStorageListener = new MetaStorageListener(
                 keyValueStorage,
-                mock(ClusterTimeImpl.class),
+                clusterTime,
                 raftConfiguration.retryTimeout(),
                 completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
         );
@@ -190,7 +197,11 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
         lenient().doAnswer(invocationClose -> {
             Iif iif = invocationClose.getArgument(0);
 
-            MultiInvokeCommand multiInvokeCommand = 
commandsFactory.multiInvokeCommand().iif(iif).id(commandIdGenerator.newId()).build();
+            MultiInvokeCommand multiInvokeCommand = 
commandsFactory.multiInvokeCommand()
+                    .iif(iif)
+                    .id(commandIdGenerator.newId())
+                    .initiatorTimeLong(clusterTime.nowLong())
+                    .build();
 
             return metaStorageService.run(multiInvokeCommand);
         }).when(metaStorageManager).invoke(any());
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
index 5989c9de08..17c68308b7 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
@@ -161,6 +161,8 @@ class IndexAvailabilityController implements 
ManuallyCloseable {
     public void recover(long recoveryRevision) {
         inBusyLock(busyLock, () -> {
             // It is expected that the method will only be called on recovery, 
when the deploy of metastore watches has not yet occurred.
+            // TODO: IGNITE-22656 Potentially dangerous to take the latest 
version as the tables and indexes might no longer present
+            //  in the catalog.
             int catalogVersion = catalogManager.latestCatalogVersion();
 
             List<CompletableFuture<?>> futures = 
catalogManager.indexes(catalogVersion).stream()
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index c55c5d0410..8f7cd70732 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -526,6 +526,7 @@ public class PartitionReplicaLifecycleManager implements 
IgniteComponent {
                     ZonePartitionId replicaGrpId = new ZonePartitionId(zoneId, 
partitionId);
 
                     // It is safe to get the latest version of the catalog as 
we are in the metastore thread.
+                    // TODO: IGNITE-22661 Potentially unsafe to use the latest 
catalog version. Better to take the version from Assignments.
                     int catalogVersion = catalogMgr.latestCatalogVersion();
 
                     CatalogZoneDescriptor zoneDescriptor = 
catalogMgr.zone(zoneId, catalogVersion);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 2b25d6a5b6..992dd604be 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -33,6 +33,7 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -57,6 +58,7 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.TransactionOptions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -210,6 +212,44 @@ public class ItRebalanceTest extends 
BaseIgniteAbstractTest {
         waitForTablesCounterInMetastore(0, zoneId, 2);
     }
 
+    @Test
+    void testRebalanceTablesCounterForZonePrevCatalogVersion() throws 
Exception {
+        cluster.startAndInit(3);
+
+        String zoneName = "ZONE";
+
+        createZone(zoneName, 3, 3);
+
+        Set<Integer> tableIds = new HashSet<>();
+
+        tableIds.add(createTestTable("TEST1", zoneName));
+
+        Set<String> allNodes = 
cluster.runningNodes().map(IgniteImpl::name).collect(Collectors.toSet());
+
+        for (Integer tableId : tableIds) {
+            waitForStableAssignmentsInMetastore(allNodes, tableId);
+        }
+
+        // Block low watermark change with an open ro tx.
+        cluster.aliveNode().transactions().begin(new 
TransactionOptions().readOnly(true));
+
+        alterZone(zoneName, 2);
+
+        dropTestTable("TEST1");
+
+        CatalogManager catalogManager = cluster.aliveNode().catalogManager();
+
+        int zoneId = 
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneName).id();
+
+        for (Integer tableId : tableIds) {
+            waitForStableAssignmentsInMetastore(2, tableId);
+        }
+
+        waitForTablesCounterInMetastore(0, zoneId, 0);
+        waitForTablesCounterInMetastore(0, zoneId, 1);
+        waitForTablesCounterInMetastore(0, zoneId, 2);
+    }
+
     private static Row marshalTuple(TableViewInternal table, Tuple tuple) {
         SchemaRegistry schemaReg = table.schemaView();
         var marshaller = new TupleMarshallerImpl(schemaReg.lastKnownSchema());
@@ -246,7 +286,7 @@ public class ItRebalanceTest extends BaseIgniteAbstractTest 
{
             lastAssignmentsHolderForLog[0] = assignments;
 
             return assignments.size() == expectedNodesNumber;
-        }, 30000), "Expected nodes: " + expectedNodesNumber + ", actual nodes 
size: " + lastAssignmentsHolderForLog[0].size());
+        }, 30000), "Expected nodes: " + expectedNodesNumber + ", actual nodes 
size: " + actualSize(lastAssignmentsHolderForLog));
     }
 
     private void waitForTablesCounterInMetastore(int expectedTablesNumber, int 
zoneId, int partitionNumber) throws InterruptedException {
@@ -259,7 +299,12 @@ public class ItRebalanceTest extends 
BaseIgniteAbstractTest {
 
             return tablesCounter != null && tablesCounter.size() == 
expectedTablesNumber;
 
-        }, 30000), "Expected tables number: " + expectedTablesNumber + ", 
actual tables number: " + lastAssignmentsHolderForLog[0].size());
+        }, 30000),
+                "Expected tables number: " + expectedTablesNumber + ", actual 
tables number: " + actualSize(lastAssignmentsHolderForLog));
+    }
+
+    private static String actualSize(Collection<?>[] collection) {
+        return collection[0] == null ? "null" : 
Integer.toString(collection[0].size());
     }
 
     private static CompletableFuture<Set<Integer>> tablesCounter(
@@ -313,4 +358,12 @@ public class ItRebalanceTest extends 
BaseIgniteAbstractTest {
                 .filter(t -> t.name().equals(tableName))
                 .findFirst().get().id();
     }
+
+    private void dropTestTable(String tableName) {
+        String sql2 = "drop table if exists " + tableName;
+
+        cluster.doInSession(1, session -> {
+            executeUpdate(sql2, session);
+        });
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index c2e6b230e4..13406d7f77 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -34,6 +34,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.catalogVersionKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractPartitionNumber;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTableId;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.intersect;
@@ -57,6 +58,7 @@ import static 
org.apache.ignite.internal.table.distributed.TableUtils.droppedTab
 import static 
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTable;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
+import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
@@ -1787,14 +1789,18 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                                     stringKey, partId, table.name(), 
localNode().address(), pendingAssignments, revision);
                         }
 
-                        return 
setTablesPartitionCountersForRebalance(replicaGrpId, revision, 
pendingAssignments.force())
+                        // TODO: IGNITE-22661 should come from the 
assignments. The version valid at the time of assignment creation.
+                        int catalogVersion = 
catalogService.latestCatalogVersion();
+
+                        return 
setTablesPartitionCountersForRebalance(replicaGrpId, revision, 
pendingAssignments.force(), catalogVersion)
                                 .thenCompose(r -> 
handleChangePendingAssignmentEvent(
                                         replicaGrpId,
                                         table,
                                         stableAssignments,
                                         pendingAssignments,
                                         revision,
-                                        isRecovery
+                                        isRecovery,
+                                        catalogVersion
                                 ))
                                 .thenCompose(v -> 
changePeersOnRebalance(table, replicaGrpId, pendingAssignments.nodes(), 
revision));
                     } finally {
@@ -1810,7 +1816,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             @Nullable Assignments stableAssignments,
             Assignments pendingAssignments,
             long revision,
-            boolean isRecovery
+            boolean isRecovery,
+            int catalogVersion
     ) {
         boolean pendingAssignmentsAreForced = pendingAssignments.force();
         Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
@@ -1866,7 +1873,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                             );
                         }
 
-                        int zoneId = getTableDescriptor(tbl.tableId(), 
catalogService.latestCatalogVersion()).zoneId();
+                        int zoneId = getTableDescriptor(tbl.tableId(), 
catalogVersion).zoneId();
 
                         return startPartitionAndStartClient(
                                 tbl,
@@ -1899,9 +1906,12 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }, ioExecutor);
     }
 
-    private CompletableFuture<Void> 
setTablesPartitionCountersForRebalance(TablePartitionId replicaGrpId, long 
revision, boolean force) {
-        int catalogVersion = catalogService.latestCatalogVersion();
-
+    private CompletableFuture<Void> setTablesPartitionCountersForRebalance(
+            TablePartitionId replicaGrpId,
+            long revision,
+            boolean force,
+            int catalogVersion
+    ) {
         int tableId = replicaGrpId.tableId();
 
         CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(getTableDescriptor(tableId, catalogVersion), catalogVersion);
@@ -1924,9 +1934,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         byte[] countersValue = toBytes(tablesInZone);
 
+        // The collected tables are valid for the current catalog version but 
may be removed in future versions.
+        // Therefore, we need to store the `catalogVersion` alongside the 
counter to ensure we read the correct catalog version later.
         return metaStorageMgr.invoke(iif(
                 condition,
-                ops(put(tablesCounterKey(zoneId, partId), 
countersValue)).yield(true),
+                ops(
+                        put(tablesCounterKey(zoneId, partId), countersValue),
+                        put(catalogVersionKey(zoneId, partId), 
intToBytes(catalogVersion))
+                ).yield(true),
                 ops().yield(false)
         )).whenComplete((res, e) -> {
             if (e != null) {
@@ -2070,6 +2085,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     TablePartitionId replicaGrpId = new 
TablePartitionId(tableId, partitionId);
 
                     // It is safe to get the latest version of the catalog as 
we are in the metastore thread.
+                    // TODO: IGNITE-22661 Potentially unsafe to use the latest 
catalog version, as the tables might not already present
+                    //  in the catalog. Better to take the version from 
Assignments.
                     int catalogVersion = catalogService.latestCatalogVersion();
 
                     return tablesById(evt.revision())
@@ -2543,7 +2560,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
                 assert stableAssignments != null : "tablePartitionId=" + 
tablePartitionId + ", revision=" + revision;
 
-                int zoneId = getTableDescriptor(tablePartitionId.tableId(), 
catalogService.latestCatalogVersion()).zoneId();
+                // TODO: IGNITE-22661 Potentially unsafe to use the latest 
catalog version, as the tables might not already present
+                //  in the catalog. Better to store this version in 
ManualGroupRestartRequest.
+                int catalogVersion = catalogService.latestCatalogVersion();
+
+                int zoneId = getTableDescriptor(tablePartitionId.tableId(), 
catalogVersion).zoneId();
 
                 return startPartitionAndStartClient(
                         table,


Reply via email to