This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3004974109 IGNITE-17899 Properly stopping resources in TableManager. 
Fixes #1282
3004974109 is described below

commit 300497410957aa43aad308642f5ecf42fbc297fa
Author: Sergey Uttsel <[email protected]>
AuthorDate: Mon Nov 28 09:32:27 2022 +0200

    IGNITE-17899 Properly stopping resources in TableManager. Fixes #1282
    
    Signed-off-by: Slava Koptilin <[email protected]>
---
 .../apache/ignite/internal/util/IgniteUtils.java   | 38 ++++++++++
 .../internal/storage/engine/MvTableStorage.java    |  3 +-
 .../internal/storage/impl/TestMvTableStorage.java  |  5 ++
 .../pagememory/AbstractPageMemoryTableStorage.java |  5 ++
 .../storage/rocksdb/RocksDbTableStorage.java       |  5 ++
 .../internal/table/distributed/TableManager.java   | 66 ++++++++++++++---
 .../table/distributed/TableManagerTest.java        | 86 +++++++++++++++++++++-
 7 files changed, 194 insertions(+), 14 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 93d032a976..efbad3636c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -54,6 +54,7 @@ import java.util.function.Supplier;
 import java.util.stream.Stream;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.util.worker.IgniteWorker;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -633,6 +634,43 @@ public class IgniteUtils {
         closeAll(Arrays.stream(closeables));
     }
 
+    /**
+     * Closes all provided objects. If any of the {@link 
ManuallyCloseable#close} methods throw an exception,
+     * only the first thrown exception will be propagated to the caller, after 
all other objects are closed,
+     * similar to the try-with-resources block.
+     *
+     * @param closeables Stream of objects to close.
+     * @throws Exception If failed to close.
+     */
+    public static void closeAllManually(Stream<? extends ManuallyCloseable> 
closeables) throws Exception {
+        AtomicReference<Exception> ex = new AtomicReference<>();
+
+        closeables.filter(Objects::nonNull).forEach(closeable -> {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                if (!ex.compareAndSet(null, e)) {
+                    ex.get().addSuppressed(e);
+                }
+            }
+        });
+
+        if (ex.get() != null) {
+            throw ex.get();
+        }
+    }
+
+    /**
+     * Closes all provided objects.
+     *
+     * @param closeables Array of closeable objects to close.
+     * @throws Exception If failed to close.
+     * @see #closeAll(Collection)
+     */
+    public static void closeAllManually(ManuallyCloseable... closeables) 
throws Exception {
+        closeAllManually(Arrays.stream(closeables));
+    }
+
     /**
      * Short date format pattern for log messages in "quiet" mode. Only time 
is included since we don't expect "quiet" mode to be used for
      * longer runs.
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index fad6d93356..57a404ae5c 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.schema.configuration.index.TableIndexCo
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
@@ -36,7 +37,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Table storage that contains meta, partitions and SQL indexes.
  */
-public interface MvTableStorage {
+public interface MvTableStorage extends ManuallyCloseable {
     /**
      * Retrieves or creates a partition for the current table. Not expected to 
be called concurrently with the same Partition ID.
      *
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index 68858f8513..a5c77df175 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -175,6 +175,11 @@ public class TestMvTableStorage implements MvTableStorage {
     public void stop() throws StorageException {
     }
 
+    @Override
+    public void close() throws StorageException {
+        stop();
+    }
+
     @Override
     public void destroy() throws StorageException {
     }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 4e5f545021..cb674a1929 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -170,6 +170,11 @@ public abstract class AbstractPageMemoryTableStorage 
implements MvTableStorage {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    @Override
+    public void close() throws StorageException {
+        stop();
+    }
+
     /**
      * Closes all {@link #mvPartitions}.
      *
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index a02f6f19ff..2e8c0d4d3e 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -346,6 +346,11 @@ public class RocksDbTableStorage implements MvTableStorage 
{
         }
     }
 
+    @Override
+    public void close() throws StorageException {
+        stop();
+    }
+
     @Override
     public void destroy() throws StorageException {
         stop();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2839dcaea2..9c5b2ecde6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -62,6 +62,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntSupplier;
@@ -134,6 +135,7 @@ import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableSt
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteNameUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.RebalanceUtil;
@@ -971,20 +973,64 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      */
     private void cleanUpTablesResources(Map<UUID, TableImpl> tables) {
         for (TableImpl table : tables.values()) {
-            try {
-                for (int p = 0; p < table.internalTable().partitions(); p++) {
-                    TablePartitionId replicationGroupId = new 
TablePartitionId(table.tableId(), p);
+            List<Runnable> stopping = new ArrayList<>();
 
-                    raftMgr.stopRaftGroup(replicationGroupId);
+            AtomicReference<Exception> exception = new AtomicReference<>();
 
-                    replicaMgr.stopReplica(replicationGroupId);
-                }
+            AtomicBoolean nodeStoppingEx = new AtomicBoolean();
+
+            for (int p = 0; p < table.internalTable().partitions(); p++) {
+                TablePartitionId replicationGroupId = new 
TablePartitionId(table.tableId(), p);
+
+                stopping.add(() -> {
+                    try {
+                        raftMgr.stopRaftGroup(replicationGroupId);
+                    } catch (Exception e) {
+                        if (!exception.compareAndSet(null, e)) {
+                            if (!(e instanceof NodeStoppingException) || 
!nodeStoppingEx.get()) {
+                                exception.get().addSuppressed(e);
+                            }
+                        }
+
+                        if (e instanceof NodeStoppingException) {
+                            nodeStoppingEx.set(true);
+                        }
+                    }
+                });
+
+                stopping.add(() -> {
+                    try {
+                        replicaMgr.stopReplica(replicationGroupId);
+                    } catch (Exception e) {
+                        if (!exception.compareAndSet(null, e)) {
+                            if (!(e instanceof NodeStoppingException) || 
!nodeStoppingEx.get()) {
+                                exception.get().addSuppressed(e);
+                            }
+                        }
+
+                        if (e instanceof NodeStoppingException) {
+                            nodeStoppingEx.set(true);
+                        }
+                    }
+                });
+            }
 
-                table.internalTable().storage().stop();
-                table.internalTable().txStateStorage().stop();
-                table.internalTable().close();
+            stopping.forEach(Runnable::run);
+
+            try {
+                IgniteUtils.closeAllManually(
+                        table.internalTable().storage(),
+                        table.internalTable().txStateStorage(),
+                        table.internalTable()
+                );
             } catch (Exception e) {
-                LOG.info("Unable to stop table [name={}]", e, table.name());
+                if (!exception.compareAndSet(null, e)) {
+                    exception.get().addSuppressed(e);
+                }
+            }
+
+            if (exception.get() != null) {
+                LOG.info("Unable to stop table [name={}, tableId={}]", 
exception.get(), table.name(), table.tableId());
             }
         }
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7ec8b19e0e..6d5487c3e8 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
@@ -96,7 +97,9 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
@@ -413,27 +416,104 @@ public class TableManagerTest extends IgniteAbstractTest 
{
     }
 
     /**
-     * Cheks that the all RAFT nodes will be stopped when Table manager is 
stopping.
+     * Checks that the all RAFT nodes will be stopped when Table manager is 
stopping and
+     * an exception that was thrown by one of the component will not prevent 
stopping other components.
      *
      * @throws Exception If failed.
      */
     @Test
-    public void tableManagerStopTest() throws Exception {
+    public void tableManagerStopTest1() throws Exception {
+        IgniteBiTuple<TableImpl, TableManager> tblAndMnr = 
startTableManagerStopTest();
+
+        endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
+                () -> {
+                    try {
+                        doThrow(new 
NodeStoppingException()).when(rm).stopRaftGroup(any());
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    /**
+     * Checks that the all RAFT nodes will be stopped when Table manager is 
stopping and
+     * an exception that was thrown by one of the component will not prevent 
stopping other components.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void tableManagerStopTest2() throws Exception {
+        IgniteBiTuple<TableImpl, TableManager> tblAndMnr = 
startTableManagerStopTest();
+
+        endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
+                () -> {
+                    try {
+                        doThrow(new 
NodeStoppingException()).when(replicaMgr).stopReplica(any());
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    /**
+     * Checks that the all RAFT nodes will be stopped when Table manager is 
stopping and
+     * an exception that was thrown by one of the component will not prevent 
stopping other components.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void tableManagerStopTest3() throws Exception {
+        IgniteBiTuple<TableImpl, TableManager> tblAndMnr = 
startTableManagerStopTest();
+
+        endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
+                () -> {
+                    try {
+                        doThrow(new 
RuntimeException()).when(tblAndMnr.get1().internalTable().storage()).close();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    /**
+     * Checks that the all RAFT nodes will be stopped when Table manager is 
stopping and
+     * an exception that was thrown by one of the component will not prevent 
stopping other components.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void tableManagerStopTest4() throws Exception {
+        IgniteBiTuple<TableImpl, TableManager> tblAndMnr = 
startTableManagerStopTest();
+
+        endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
+                () -> doThrow(new 
RuntimeException()).when(tblAndMnr.get1().internalTable().txStateStorage()).close());
+    }
+
+    private IgniteBiTuple<TableImpl, TableManager> startTableManagerStopTest() 
throws Exception {
         TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", 
DYNAMIC_TABLE_FOR_DROP_NAME).columns(
                 SchemaBuilders.column("key", ColumnType.INT64).build(),
                 SchemaBuilders.column("val", 
ColumnType.INT64).asNullable(true).build()
         ).withPrimaryKey("key").build();
 
-        mockManagersAndCreateTable(scmTbl, tblManagerFut);
+        TableImpl table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
 
         verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any());
 
         TableManager tableManager = tblManagerFut.join();
 
+        return new IgniteBiTuple<>(table, tableManager);
+    }
+
+    private void endTableManagerStopTest(TableImpl table, TableManager 
tableManager, Runnable mockDoThrow) throws Exception {
+        mockDoThrow.run();
+
         tableManager.stop();
 
         verify(rm, times(PARTITIONS)).stopRaftGroup(any());
         verify(replicaMgr, times(PARTITIONS)).stopReplica(any());
+
+        verify(table.internalTable().storage()).close();
+        verify(table.internalTable().txStateStorage()).close();
     }
 
     /**

Reply via email to