This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3004974109 IGNITE-17899 Properly stopping resources in TableManager.
Fixes #1282
3004974109 is described below
commit 300497410957aa43aad308642f5ecf42fbc297fa
Author: Sergey Uttsel <[email protected]>
AuthorDate: Mon Nov 28 09:32:27 2022 +0200
IGNITE-17899 Properly stopping resources in TableManager. Fixes #1282
Signed-off-by: Slava Koptilin <[email protected]>
---
.../apache/ignite/internal/util/IgniteUtils.java | 38 ++++++++++
.../internal/storage/engine/MvTableStorage.java | 3 +-
.../internal/storage/impl/TestMvTableStorage.java | 5 ++
.../pagememory/AbstractPageMemoryTableStorage.java | 5 ++
.../storage/rocksdb/RocksDbTableStorage.java | 5 ++
.../internal/table/distributed/TableManager.java | 66 ++++++++++++++---
.../table/distributed/TableManagerTest.java | 86 +++++++++++++++++++++-
7 files changed, 194 insertions(+), 14 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 93d032a976..efbad3636c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -54,6 +54,7 @@ import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.util.worker.IgniteWorker;
import org.apache.ignite.lang.IgniteInternalException;
@@ -633,6 +634,43 @@ public class IgniteUtils {
closeAll(Arrays.stream(closeables));
}
+ /**
+ * Closes all provided objects. If any of the {@link
ManuallyCloseable#close} methods throw an exception,
+ * only the first thrown exception will be propagated to the caller, after
all other objects are closed,
+ * similar to the try-with-resources block.
+ *
+ * @param closeables Stream of objects to close.
+ * @throws Exception If failed to close.
+ */
+ public static void closeAllManually(Stream<? extends ManuallyCloseable>
closeables) throws Exception {
+ AtomicReference<Exception> ex = new AtomicReference<>();
+
+ closeables.filter(Objects::nonNull).forEach(closeable -> {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ if (!ex.compareAndSet(null, e)) {
+ ex.get().addSuppressed(e);
+ }
+ }
+ });
+
+ if (ex.get() != null) {
+ throw ex.get();
+ }
+ }
+
+ /**
+ * Closes all provided objects.
+ *
+ * @param closeables Array of closeable objects to close.
+ * @throws Exception If failed to close.
+ * @see #closeAll(Collection)
+ */
+ public static void closeAllManually(ManuallyCloseable... closeables)
throws Exception {
+ closeAllManually(Arrays.stream(closeables));
+ }
+
/**
* Short date format pattern for log messages in "quiet" mode. Only time
is included since we don't expect "quiet" mode to be used for
* longer runs.
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index fad6d93356..57a404ae5c 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.schema.configuration.index.TableIndexCo
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
@@ -36,7 +37,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Table storage that contains meta, partitions and SQL indexes.
*/
-public interface MvTableStorage {
+public interface MvTableStorage extends ManuallyCloseable {
/**
* Retrieves or creates a partition for the current table. Not expected to
be called concurrently with the same Partition ID.
*
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index 68858f8513..a5c77df175 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -175,6 +175,11 @@ public class TestMvTableStorage implements MvTableStorage {
public void stop() throws StorageException {
}
+ @Override
+ public void close() throws StorageException {
+ stop();
+ }
+
@Override
public void destroy() throws StorageException {
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 4e5f545021..cb674a1929 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -170,6 +170,11 @@ public abstract class AbstractPageMemoryTableStorage
implements MvTableStorage {
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public void close() throws StorageException {
+ stop();
+ }
+
/**
* Closes all {@link #mvPartitions}.
*
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index a02f6f19ff..2e8c0d4d3e 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -346,6 +346,11 @@ public class RocksDbTableStorage implements MvTableStorage
{
}
}
+ @Override
+ public void close() throws StorageException {
+ stop();
+ }
+
@Override
public void destroy() throws StorageException {
stop();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2839dcaea2..9c5b2ecde6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -62,6 +62,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
@@ -134,6 +135,7 @@ import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableSt
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteNameUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.RebalanceUtil;
@@ -971,20 +973,64 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*/
private void cleanUpTablesResources(Map<UUID, TableImpl> tables) {
for (TableImpl table : tables.values()) {
- try {
- for (int p = 0; p < table.internalTable().partitions(); p++) {
- TablePartitionId replicationGroupId = new
TablePartitionId(table.tableId(), p);
+ List<Runnable> stopping = new ArrayList<>();
- raftMgr.stopRaftGroup(replicationGroupId);
+ AtomicReference<Exception> exception = new AtomicReference<>();
- replicaMgr.stopReplica(replicationGroupId);
- }
+ AtomicBoolean nodeStoppingEx = new AtomicBoolean();
+
+ for (int p = 0; p < table.internalTable().partitions(); p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(table.tableId(), p);
+
+ stopping.add(() -> {
+ try {
+ raftMgr.stopRaftGroup(replicationGroupId);
+ } catch (Exception e) {
+ if (!exception.compareAndSet(null, e)) {
+ if (!(e instanceof NodeStoppingException) ||
!nodeStoppingEx.get()) {
+ exception.get().addSuppressed(e);
+ }
+ }
+
+ if (e instanceof NodeStoppingException) {
+ nodeStoppingEx.set(true);
+ }
+ }
+ });
+
+ stopping.add(() -> {
+ try {
+ replicaMgr.stopReplica(replicationGroupId);
+ } catch (Exception e) {
+ if (!exception.compareAndSet(null, e)) {
+ if (!(e instanceof NodeStoppingException) ||
!nodeStoppingEx.get()) {
+ exception.get().addSuppressed(e);
+ }
+ }
+
+ if (e instanceof NodeStoppingException) {
+ nodeStoppingEx.set(true);
+ }
+ }
+ });
+ }
- table.internalTable().storage().stop();
- table.internalTable().txStateStorage().stop();
- table.internalTable().close();
+ stopping.forEach(Runnable::run);
+
+ try {
+ IgniteUtils.closeAllManually(
+ table.internalTable().storage(),
+ table.internalTable().txStateStorage(),
+ table.internalTable()
+ );
} catch (Exception e) {
- LOG.info("Unable to stop table [name={}]", e, table.name());
+ if (!exception.compareAndSet(null, e)) {
+ exception.get().addSuppressed(e);
+ }
+ }
+
+ if (exception.get() != null) {
+ LOG.info("Unable to stop table [name={}, tableId={}]",
exception.get(), table.name(), table.tableId());
}
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7ec8b19e0e..6d5487c3e8 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
@@ -96,7 +97,9 @@ import
org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
@@ -413,27 +416,104 @@ public class TableManagerTest extends IgniteAbstractTest
{
}
/**
- * Cheks that the all RAFT nodes will be stopped when Table manager is
stopping.
+ * Checks that the all RAFT nodes will be stopped when Table manager is
stopping and
+ * an exception that was thrown by one of the component will not prevent
stopping other components.
*
* @throws Exception If failed.
*/
@Test
- public void tableManagerStopTest() throws Exception {
+ public void tableManagerStopTest1() throws Exception {
+ IgniteBiTuple<TableImpl, TableManager> tblAndMnr =
startTableManagerStopTest();
+
+ endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
+ () -> {
+ try {
+ doThrow(new
NodeStoppingException()).when(rm).stopRaftGroup(any());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /**
+ * Checks that the all RAFT nodes will be stopped when Table manager is
stopping and
+ * an exception that was thrown by one of the component will not prevent
stopping other components.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void tableManagerStopTest2() throws Exception {
+ IgniteBiTuple<TableImpl, TableManager> tblAndMnr =
startTableManagerStopTest();
+
+ endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
+ () -> {
+ try {
+ doThrow(new
NodeStoppingException()).when(replicaMgr).stopReplica(any());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /**
+ * Checks that the all RAFT nodes will be stopped when Table manager is
stopping and
+ * an exception that was thrown by one of the component will not prevent
stopping other components.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void tableManagerStopTest3() throws Exception {
+ IgniteBiTuple<TableImpl, TableManager> tblAndMnr =
startTableManagerStopTest();
+
+ endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
+ () -> {
+ try {
+ doThrow(new
RuntimeException()).when(tblAndMnr.get1().internalTable().storage()).close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /**
+ * Checks that the all RAFT nodes will be stopped when Table manager is
stopping and
+ * an exception that was thrown by one of the component will not prevent
stopping other components.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void tableManagerStopTest4() throws Exception {
+ IgniteBiTuple<TableImpl, TableManager> tblAndMnr =
startTableManagerStopTest();
+
+ endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
+ () -> doThrow(new
RuntimeException()).when(tblAndMnr.get1().internalTable().txStateStorage()).close());
+ }
+
+ private IgniteBiTuple<TableImpl, TableManager> startTableManagerStopTest()
throws Exception {
TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC",
DYNAMIC_TABLE_FOR_DROP_NAME).columns(
SchemaBuilders.column("key", ColumnType.INT64).build(),
SchemaBuilders.column("val",
ColumnType.INT64).asNullable(true).build()
).withPrimaryKey("key").build();
- mockManagersAndCreateTable(scmTbl, tblManagerFut);
+ TableImpl table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any());
TableManager tableManager = tblManagerFut.join();
+ return new IgniteBiTuple<>(table, tableManager);
+ }
+
+ private void endTableManagerStopTest(TableImpl table, TableManager
tableManager, Runnable mockDoThrow) throws Exception {
+ mockDoThrow.run();
+
tableManager.stop();
verify(rm, times(PARTITIONS)).stopRaftGroup(any());
verify(replicaMgr, times(PARTITIONS)).stopReplica(any());
+
+ verify(table.internalTable().storage()).close();
+ verify(table.internalTable().txStateStorage()).close();
}
/**