This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 84a65363c6 IGNITE-18633 Storage cleanup integration if one of them has 
not finished rebalancing with TableManager (#1586)
84a65363c6 is described below

commit 84a65363c690071e4d84af346672d35648b7ac88
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Jan 27 15:06:54 2023 +0300

    IGNITE-18633 Storage cleanup integration if one of them has not finished 
rebalancing with TableManager (#1586)
---
 .../internal/table/distributed/TableManager.java   | 106 +++++++++------------
 .../distributed/storage/PartitionStorages.java     |  55 +++++++++++
 2 files changed, 102 insertions(+), 59 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index c2296a8e2a..91e38e7649 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -131,6 +131,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -699,10 +700,11 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(clock.now());
 
-                CompletableFuture<MvPartitionStorage> mvPartitionStorageFut = 
getOrCreateMvPartition(internalTbl.storage(), partId);
+                CompletableFuture<PartitionStorages> partitionStoragesFut = 
getOrCreatePartitionStorages(table, partId);
 
-                CompletableFuture<PartitionDataStorage> 
partitionDataStorageFut = mvPartitionStorageFut
-                        .thenApply(mvPartitionStorage -> 
partitionDataStorage(mvPartitionStorage, internalTbl, partId));
+                CompletableFuture<PartitionDataStorage> 
partitionDataStorageFut = partitionStoragesFut
+                        .thenApply(partitionStorages -> 
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+                                internalTbl, partId));
 
                 CompletableFuture<StorageUpdateHandler> 
storageUpdateHandlerFut = partitionDataStorageFut
                         .thenApply(storage -> new StorageUpdateHandler(partId, 
storage, table.indexStorageAdapters(partId)));
@@ -711,7 +713,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 // start new nodes, only if it is table creation, other cases 
will be covered by rebalance logic
                 if (oldPartAssignment.isEmpty() && localMemberAssignment != 
null) {
-                    startGroupFut = 
mvPartitionStorageFut.thenComposeAsync(mvPartitionStorage -> {
+                    startGroupFut = 
partitionStoragesFut.thenComposeAsync(partitionStorages -> {
+                        MvPartitionStorage mvPartitionStorage = 
partitionStorages.getMvPartitionStorage();
+
                         boolean hasData = 
mvPartitionStorage.lastAppliedIndex() > 0;
 
                         CompletableFuture<Boolean> fut;
@@ -752,8 +756,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                             return partitionDataStorageFut
                                     .thenCompose(s -> storageUpdateHandlerFut)
-                                    .thenCompose(s -> 
getOrCreateTxStateStorage(internalTbl.txStateStorage(), partId))
-                                    .thenAcceptAsync(txStatePartitionStorage 
-> {
+                                    .thenAcceptAsync(storageUpdateHandler -> {
+                                        TxStateStorage txStatePartitionStorage 
= partitionStorages.getTxStateStorage();
+
                                         RaftGroupOptions groupOptions = 
groupOptionsForPartition(
                                                 internalTbl.storage(),
                                                 internalTbl.txStateStorage(),
@@ -766,7 +771,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                         var raftNodeId = new 
RaftNodeId(replicaGrpId, serverPeer);
 
                                         PartitionDataStorage 
partitionDataStorage = partitionDataStorageFut.join();
-                                        StorageUpdateHandler 
storageUpdateHandler = storageUpdateHandlerFut.join();
 
                                         try {
                                             // TODO: use RaftManager 
interface, see https://issues.apache.org/jira/browse/IGNITE-18273
@@ -817,12 +821,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 return completedFuture(null);
                             }
 
-                            CompletableFuture<TxStateStorage> 
txStateStorageFuture =
-                                    
getOrCreateTxStateStorage(internalTbl.txStateStorage(), partId);
-
                             StorageUpdateHandler storageUpdateHandler = 
storageUpdateHandlerFut.join();
 
-                            return 
mvPartitionStorageFut.thenAcceptBoth(txStateStorageFuture, (partitionStorage, 
txStateStorage) -> {
+                            return 
partitionStoragesFut.thenAccept(partitionStorages -> {
+                                MvPartitionStorage partitionStorage = 
partitionStorages.getMvPartitionStorage();
+                                TxStateStorage txStateStorage = 
partitionStorages.getTxStateStorage();
+
                                 try {
                                     replicaMgr.startReplica(replicaGrpId,
                                             new PartitionReplicaListener(
@@ -1842,13 +1846,15 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                             pendingAssignmentsWatchEvent.key(), partId, 
tbl.name(), localMember.address());
 
                     if (shouldStartLocalServices) {
-                        MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(internalTable.storage(), partId).join();
+                        PartitionStorages partitionStorages = 
getOrCreatePartitionStorages(tbl, partId).join();
+
+                        MvPartitionStorage mvPartitionStorage = 
partitionStorages.getMvPartitionStorage();
+                        TxStateStorage txStatePartitionStorage = 
partitionStorages.getTxStateStorage();
+
                         PartitionDataStorage partitionDataStorage = 
partitionDataStorage(mvPartitionStorage, internalTable, partId);
                         StorageUpdateHandler storageUpdateHandler =
                                 new StorageUpdateHandler(partId, 
partitionDataStorage, tbl.indexStorageAdapters(partId));
 
-                        TxStateStorage txStatePartitionStorage = 
getOrCreateTxStateStorage(internalTable.txStateStorage(), partId).join();
-
                         RaftGroupOptions groupOptions = 
groupOptionsForPartition(
                                 internalTable.storage(),
                                 internalTable.txStateStorage(),
@@ -2046,51 +2052,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         return getMetadataLocallyOnly ? property : 
ConfigurationUtil.directProxy(property);
     }
 
-    /**
-     * Returns or creates multi-versioned partition storage.
-     *
-     * <p>If a full rebalance has not been completed for a partition, it will 
be recreated to remove any garbage that might have been left
-     * in when the rebalance was interrupted.
-     *
-     * @param mvTableStorage Multi-versioned table storage.
-     * @param partitionId Partition ID.
-     * @return Future that will complete when the operation completes.
-     */
-    private CompletableFuture<MvPartitionStorage> 
getOrCreateMvPartition(MvTableStorage mvTableStorage, int partitionId) {
-        // TODO: IGNITE-18633 Should clean both MvPartitionStorage and 
TxStateStorage if the rebalance for one of them has not ended
-        // TODO: IGNITE-18633 Also think about waiting for index stores for a 
partition, see PartitionAccessImpl.startRebalance
-        return CompletableFuture.supplyAsync(() -> 
mvTableStorage.getOrCreateMvPartition(partitionId), ioExecutor)
-                .thenCompose(mvPartitionStorage -> {
-                    if (mvPartitionStorage.persistedIndex() == 
MvPartitionStorage.REBALANCE_IN_PROGRESS) {
-                        return 
mvTableStorage.clearPartition(partitionId).thenApply(unused -> 
mvPartitionStorage);
-                    } else {
-                        return completedFuture(mvPartitionStorage);
-                    }
-                });
-    }
-
-    /**
-     * Returns or creates transaction state storage for a partition.
-     *
-     * <p>If a full rebalance has not been completed for a partition, it will 
be recreated to remove any garbage that might have been left
-     * in when the rebalance was interrupted.
-     *
-     * @param txStateTableStorage Transaction state storage for a table.
-     * @param partitionId Partition ID.
-     * @return Future that will complete when the operation completes.
-     */
-    private CompletableFuture<TxStateStorage> 
getOrCreateTxStateStorage(TxStateTableStorage txStateTableStorage, int 
partitionId) {
-        // TODO: IGNITE-18633 Should clean both MvPartitionStorage and 
TxStateStorage if the rebalance for one of them has not ended
-        return CompletableFuture.supplyAsync(() -> 
txStateTableStorage.getOrCreateTxStateStorage(partitionId), ioExecutor)
-                .thenCompose(txStateStorage -> {
-                    if (txStateStorage.persistedIndex() == 
TxStateStorage.REBALANCE_IN_PROGRESS) {
-                        return txStateStorage.clear().thenApply(unused -> 
txStateStorage);
-                    } else {
-                        return completedFuture(txStateStorage);
-                    }
-                });
-    }
-
     private static PeersAndLearners 
configurationFromAssignments(Collection<Assignment> assignments) {
         var peers = new HashSet<String>();
         var learners = new HashSet<String>();
@@ -2105,4 +2066,31 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         return PeersAndLearners.fromConsistentIds(peers, learners);
     }
+
+    /**
+     * Creates or gets partition stores. If one of the storages has not 
completed the rebalance, then the storages are cleared.
+     *
+     * @param table Table.
+     * @param partitionId Partition ID.
+     * @return Future of creating or getting partition stores.
+     */
+    // TODO: IGNITE-18619 Maybe we should wait here to create indexes, if you 
add now, then the tests start to hang
+    private CompletableFuture<PartitionStorages> 
getOrCreatePartitionStorages(TableImpl table, int partitionId) {
+        return CompletableFuture
+                .supplyAsync(() -> {
+                    MvPartitionStorage mvPartitionStorage = 
table.internalTable().storage().getOrCreateMvPartition(partitionId);
+                    TxStateStorage txStateStorage = 
table.internalTable().txStateStorage().getOrCreateTxStateStorage(partitionId);
+
+                    if (mvPartitionStorage.persistedIndex() == 
MvPartitionStorage.REBALANCE_IN_PROGRESS
+                            || txStateStorage.persistedIndex() == 
TxStateStorage.REBALANCE_IN_PROGRESS) {
+                        return CompletableFuture.allOf(
+                                
table.internalTable().storage().clearPartition(partitionId),
+                                txStateStorage.clear()
+                        ).thenApply(unused -> new 
PartitionStorages(mvPartitionStorage, txStateStorage));
+                    } else {
+                        return completedFuture(new 
PartitionStorages(mvPartitionStorage, txStateStorage));
+                    }
+                }, ioExecutor)
+                .thenCompose(Function.identity());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionStorages.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionStorages.java
new file mode 100644
index 0000000000..cb8251c0ed
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/PartitionStorages.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.storage;
+
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+
+/**
+ * Partition storages holder.
+ */
+public class PartitionStorages {
+    private final MvPartitionStorage mvPartitionStorage;
+
+    private final TxStateStorage txStateStorage;
+
+    /**
+     * Constructor.
+     *
+     * @param mvPartitionStorage Multi-versioned storage.
+     * @param txStateStorage Transaction state storage.
+     */
+    public PartitionStorages(MvPartitionStorage mvPartitionStorage, 
TxStateStorage txStateStorage) {
+        this.mvPartitionStorage = mvPartitionStorage;
+        this.txStateStorage = txStateStorage;
+    }
+
+    /**
+     * Returns multi-versioned storage.
+     */
+    public MvPartitionStorage getMvPartitionStorage() {
+        return mvPartitionStorage;
+    }
+
+    /**
+     * Returns transaction state storage.
+     */
+    public TxStateStorage getTxStateStorage() {
+        return txStateStorage;
+    }
+}

Reply via email to