This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d3814b6ea1 IGNITE-19925 Reduce the number of NodeStoppingExceptions in 
logs on node stop (#2792)
d3814b6ea1 is described below

commit d3814b6ea1e0f2372aea6523496aaddb343c8c14
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Nov 6 13:16:44 2023 +0200

    IGNITE-19925 Reduce the number of NodeStoppingExceptions in logs on node 
stop (#2792)
---
 .../negotiation/LeaseNegotiator.java               |  12 +-
 .../ignite/internal/replicator/ReplicaManager.java |   7 +-
 .../runner/app/ItIgniteStopLogMessagesTest.java    | 103 +++++++++++++++
 .../internal/table/distributed/TableManager.java   | 145 ++++++++-------------
 .../table/distributed/TableManagerTest.java        |   9 +-
 5 files changed, 171 insertions(+), 105 deletions(-)

diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index ffcd828fa8..8fe9ae38b1 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -18,10 +18,12 @@
 package org.apache.ignite.internal.placementdriver.negotiation;
 
 import static 
org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreement.UNDEFINED_AGREEMENT;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.LeaseUpdater;
@@ -82,12 +84,12 @@ public class LeaseNegotiator {
                                 .force(force)
                                 .build(),
                         leaseInterval)
-                .handle((msg, throwable) -> {
-                    if (throwable != null) {
-                        LOG.warn("Lease was not negotiated due to exception 
[lease={}]", throwable, lease);
-                    } else {
+                .whenComplete((msg, throwable) -> {
+                    if (throwable == null) {
                         assert msg instanceof LeaseGrantedMessageResponse : 
"Message type is unexpected [type="
                                 + msg.getClass().getSimpleName() + ']';
+                    } else if (!(unwrapCause(throwable) instanceof 
NodeStoppingException)) {
+                        LOG.warn("Lease was not negotiated due to exception 
[lease={}]", throwable, lease);
                     }
 
                     LeaseGrantedMessageResponse response = 
(LeaseGrantedMessageResponse) msg;
@@ -95,8 +97,6 @@ public class LeaseNegotiator {
                     fut.complete(response);
 
                     triggerToRenewLeases();
-
-                    return msg;
                 });
     }
 
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 01e6d13e5c..b3677517ea 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.replicator;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import java.io.IOException;
@@ -377,14 +378,12 @@ public class ReplicaManager implements IgniteComponent {
 
             replicaFut
                     .thenCompose(replica -> 
replica.processPlacementDriverMessage(msg))
-                    .handle((response, ex) -> {
+                    .whenComplete((response, ex) -> {
                         if (ex == null) {
                             
clusterNetSvc.messagingService().respond(senderConsistentId, response, 
correlationId);
-                        } else {
+                        } else if (!(unwrapCause(ex) instanceof 
NodeStoppingException)) {
                             LOG.error("Failed to process placement driver 
message [msg={}]", ex, msg);
                         }
-
-                        return null;
                     });
         } finally {
             busyLock.leaveBusy();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteStopLogMessagesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteStopLogMessagesTest.java
new file mode 100644
index 0000000000..e75a69f831
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteStopLogMessagesTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.testframework.log4j2.LogInspector;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class that checks that no excessive error messages are being printed on 
node stop.
+ */
+public class ItIgniteStopLogMessagesTest extends ClusterPerTestIntegrationTest 
{
+    private static class FailureMessageInspector {
+        private final LogInspector logInspector;
+
+        private final String expectedMessage;
+
+        private final AtomicInteger msgCount = new AtomicInteger();
+
+        FailureMessageInspector(Class<?> loggerClass, String expectedMessage) {
+            this.logInspector = LogInspector.create(loggerClass);
+            this.expectedMessage = expectedMessage;
+
+            logInspector.addHandler(
+                    logEvent -> {
+                        Throwable throwable = logEvent.getThrown();
+
+                        return throwable != null
+                                && unwrapCause(throwable) instanceof 
NodeStoppingException
+                                && 
logEvent.getMessage().getFormattedMessage().contains(this.expectedMessage);
+                    },
+                    msgCount::incrementAndGet
+            );
+        }
+
+        void start() {
+            logInspector.start();
+        }
+
+        void stop() {
+            logInspector.stop();
+        }
+
+        void assertNoMessages() {
+            assertThat(String.format("Error message '%s' is present in the 
log", expectedMessage), msgCount.get(), is(0));
+        }
+    }
+
+    private final List<FailureMessageInspector> logInspectors = List.of(
+            new FailureMessageInspector(ReplicaManager.class, "Failed to stop 
replica"),
+            new FailureMessageInspector(ReplicaManager.class, "Failed to 
process placement driver message"),
+            new FailureMessageInspector(LeaseNegotiator.class, "Lease was not 
negotiated due to exception")
+    );
+
+    @BeforeEach
+    void startLogInspectors() {
+        logInspectors.forEach(FailureMessageInspector::start);
+    }
+
+    @AfterEach
+    @Override
+    public void tearDown() {
+        super.tearDown();
+
+        logInspectors.forEach(FailureMessageInspector::stop);
+
+        logInspectors.forEach(FailureMessageInspector::assertNoMessages);
+    }
+
+    /**
+     * Modifies the state of the cluster, actual assertions happen in the 
{@link #tearDown} method.
+     */
+    @Test
+    void testNoErrorMessagesOnStop() {
+        executeSql("CREATE TABLE TEST (key INT PRIMARY KEY)");
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e97e97875a..fe1f5e0802 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -66,8 +66,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntSupplier;
@@ -88,6 +88,7 @@ import 
org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
 import org.apache.ignite.internal.causality.CompletionListener;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -269,6 +270,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
     /** Prevents double stopping the component. */
+    private final AtomicBoolean beforeStopGuard = new AtomicBoolean();
+
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
     /** Schema manager. */
@@ -952,8 +955,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     }
 
     @Override
-    public void stop() {
-        if (!stopGuard.compareAndSet(false, true)) {
+    public void beforeNodeStop() {
+        if (!beforeStopGuard.compareAndSet(false, true)) {
             return;
         }
 
@@ -965,23 +968,32 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
         metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
 
-        Map<Integer, TableImpl> tablesToStop = 
Stream.concat(latestTablesById().entrySet().stream(), 
pendingTables.entrySet().stream())
-                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue, (v1, v2) -> v1));
+        var tablesToStop = new HashMap<Integer, TableImpl>();
+
+        tablesToStop.putAll(latestTablesById());
+        tablesToStop.putAll(pendingTables);
 
         cleanUpTablesResources(tablesToStop);
+    }
 
-        try {
-            IgniteUtils.closeAllManually(lowWatermark, mvGc);
-        } catch (Throwable t) {
-            LOG.error("Failed to close internal components", t);
+    @Override
+    public void stop() throws Exception {
+        assert beforeStopGuard.get() : "'stop' called before 'beforeNodeStop'";
+
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
         }
 
-        shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS);
-        shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS);
-        shutdownAndAwaitTermination(txStateStoragePool, 10, TimeUnit.SECONDS);
-        shutdownAndAwaitTermination(txStateStorageScheduledPool, 10, 
TimeUnit.SECONDS);
-        shutdownAndAwaitTermination(scanRequestExecutor, 10, TimeUnit.SECONDS);
-        shutdownAndAwaitTermination(incomingSnapshotsExecutor, 10, 
TimeUnit.SECONDS);
+        IgniteUtils.closeAllManually(
+                lowWatermark,
+                mvGc,
+                () -> shutdownAndAwaitTermination(rebalanceScheduler, 10, 
TimeUnit.SECONDS),
+                () -> shutdownAndAwaitTermination(ioExecutor, 10, 
TimeUnit.SECONDS),
+                () -> shutdownAndAwaitTermination(txStateStoragePool, 10, 
TimeUnit.SECONDS),
+                () -> shutdownAndAwaitTermination(txStateStorageScheduledPool, 
10, TimeUnit.SECONDS),
+                () -> shutdownAndAwaitTermination(scanRequestExecutor, 10, 
TimeUnit.SECONDS),
+                () -> shutdownAndAwaitTermination(incomingSnapshotsExecutor, 
10, TimeUnit.SECONDS)
+        );
     }
 
     /**
@@ -990,73 +1002,44 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @param tables Tables to stop.
      */
     private void cleanUpTablesResources(Map<Integer, TableImpl> tables) {
-        for (TableImpl table : tables.values()) {
-            table.beforeClose();
+        var futures = new ArrayList<CompletableFuture<Void>>(tables.size());
 
-            List<Runnable> stopping = new ArrayList<>();
-
-            AtomicReference<Throwable> throwable = new AtomicReference<>();
-
-            AtomicBoolean nodeStoppingEx = new AtomicBoolean();
+        for (TableImpl table : tables.values()) {
+            futures.add(runAsync(() -> {
+                Stream.Builder<ManuallyCloseable> stopping = Stream.builder();
 
-            InternalTable internalTable = table.internalTable();
+                stopping.add(table::beforeClose);
 
-            for (int p = 0; p < internalTable.partitions(); p++) {
-                int partitionId = p;
+                InternalTable internalTable = table.internalTable();
 
-                TablePartitionId replicationGroupId = new 
TablePartitionId(table.tableId(), p);
+                for (int p = 0; p < internalTable.partitions(); p++) {
+                    TablePartitionId replicationGroupId = new 
TablePartitionId(table.tableId(), p);
 
-                stopping.add(() -> {
-                    try {
-                        raftMgr.stopRaftNodes(replicationGroupId);
-                    } catch (Throwable t) {
-                        handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
-                    }
-                });
+                    stopping.add(() -> closePartitionTrackers(internalTable, 
replicationGroupId.partitionId()));
 
-                stopping.add(() -> {
-                    try {
-                        replicaMgr.stopReplica(replicationGroupId).join();
-                    } catch (Throwable t) {
-                        handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
-                    }
-                });
+                    stopping.add(() -> 
replicaMgr.stopReplica(replicationGroupId).get(10, TimeUnit.SECONDS));
 
-                CompletableFuture<Void> removeFromGcFuture = 
mvGc.removeStorage(replicationGroupId);
+                    stopping.add(() -> 
raftMgr.stopRaftNodes(replicationGroupId));
 
-                stopping.add(() -> {
-                    try {
-                        closePartitionTrackers(internalTable, partitionId);
-                    } catch (Throwable t) {
-                        handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
-                    }
-                });
-
-                stopping.add(() -> {
-                    try {
-                        // Should be done fairly quickly.
-                        removeFromGcFuture.join();
-                    } catch (Throwable t) {
-                        handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
-                    }
-                });
-            }
+                    stopping.add(() -> 
mvGc.removeStorage(replicationGroupId).get(10, TimeUnit.SECONDS));
+                }
 
-            stopping.forEach(Runnable::run);
+                stopping.add(internalTable.storage());
+                stopping.add(internalTable.txStateStorage());
+                stopping.add(internalTable);
 
-            try {
-                IgniteUtils.closeAllManually(
-                        internalTable.storage(),
-                        internalTable.txStateStorage(),
-                        internalTable
-                );
-            } catch (Throwable t) {
-                handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
-            }
+                try {
+                    IgniteUtils.closeAllManually(stopping.build());
+                } catch (Throwable t) {
+                    LOG.error("Unable to stop table [name={}, tableId={}]", t, 
table.name(), table.tableId());
+                }
+            }, ioExecutor));
+        }
 
-            if (throwable.get() != null) {
-                LOG.error("Unable to stop table [name={}, tableId={}]", 
throwable.get(), table.name(), table.tableId());
-            }
+        try {
+            allOf(futures.toArray(CompletableFuture[]::new)).get(30, 
TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            LOG.error("Unable to clean table resources", e);
         }
     }
 
@@ -2142,26 +2125,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 .thenCompose(Function.identity());
     }
 
-    private static void handleExceptionOnCleanUpTablesResources(
-            Throwable t,
-            AtomicReference<Throwable> throwable,
-            AtomicBoolean nodeStoppingEx
-    ) {
-        if (t instanceof CompletionException || t instanceof 
ExecutionException) {
-            t = t.getCause();
-        }
-
-        if (!throwable.compareAndSet(null, t)) {
-            if (!(t instanceof NodeStoppingException) || 
!nodeStoppingEx.get()) {
-                throwable.get().addSuppressed(t);
-            }
-        }
-
-        if (t instanceof NodeStoppingException) {
-            nodeStoppingEx.set(true);
-        }
-    }
-
     private int[] collectTableIndexIds(int tableId, int catalogVersion) {
         return catalogService.indexes(catalogVersion).stream()
                 .filter(indexDescriptor -> indexDescriptor.tableId() == 
tableId)
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index e5f109659e..6f1cefff12 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -338,7 +338,7 @@ public class TableManagerTest extends IgniteAbstractTest {
      * Tests a work of the public API for Table manager {@see 
org.apache.ignite.table.manager.IgniteTables} when the manager is stopping.
      */
     @Test
-    public void testApiTableManagerOnStop() {
+    public void testApiTableManagerOnStop() throws Exception {
         createTableManager(tblManagerFut);
 
         TableManager tableManager = tblManagerFut.join();
@@ -358,7 +358,7 @@ public class TableManagerTest extends IgniteAbstractTest {
      * stopping.
      */
     @Test
-    public void testInternalApiTableManagerOnStop() {
+    public void testInternalApiTableManagerOnStop() throws Exception {
         createTableManager(tblManagerFut);
 
         TableManager tableManager = tblManagerFut.join();
@@ -385,7 +385,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
                 () -> {
                     try {
-                        doThrow(new 
NodeStoppingException()).when(rm).stopRaftNodes(any());
+                        
when(rm.stopRaftNodes(any())).thenThrow(NodeStoppingException.class);
                     } catch (Exception e) {
                         throw new RuntimeException(e);
                     }
@@ -405,7 +405,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
                 () -> {
                     try {
-                        doThrow(new 
NodeStoppingException()).when(replicaMgr).stopReplica(any());
+                        
when(replicaMgr.stopReplica(any())).thenThrow(NodeStoppingException.class);
                     } catch (Exception e) {
                         throw new RuntimeException(e);
                     }
@@ -459,6 +459,7 @@ public class TableManagerTest extends IgniteAbstractTest {
     private void endTableManagerStopTest(TableViewInternal table, TableManager 
tableManager, Runnable mockDoThrow) throws Exception {
         mockDoThrow.run();
 
+        tableManager.beforeNodeStop();
         tableManager.stop();
 
         verify(rm, times(PARTITIONS)).stopRaftNodes(any());

Reply via email to