This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d3814b6ea1 IGNITE-19925 Reduce the number of NodeStoppingExceptions in
logs on node stop (#2792)
d3814b6ea1 is described below
commit d3814b6ea1e0f2372aea6523496aaddb343c8c14
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Nov 6 13:16:44 2023 +0200
IGNITE-19925 Reduce the number of NodeStoppingExceptions in logs on node
stop (#2792)
---
.../negotiation/LeaseNegotiator.java | 12 +-
.../ignite/internal/replicator/ReplicaManager.java | 7 +-
.../runner/app/ItIgniteStopLogMessagesTest.java | 103 +++++++++++++++
.../internal/table/distributed/TableManager.java | 145 ++++++++-------------
.../table/distributed/TableManagerTest.java | 9 +-
5 files changed, 171 insertions(+), 105 deletions(-)
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
index ffcd828fa8..8fe9ae38b1 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java
@@ -18,10 +18,12 @@
package org.apache.ignite.internal.placementdriver.negotiation;
import static
org.apache.ignite.internal.placementdriver.negotiation.LeaseAgreement.UNDEFINED_AGREEMENT;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.placementdriver.LeaseUpdater;
@@ -82,12 +84,12 @@ public class LeaseNegotiator {
.force(force)
.build(),
leaseInterval)
- .handle((msg, throwable) -> {
- if (throwable != null) {
- LOG.warn("Lease was not negotiated due to exception
[lease={}]", throwable, lease);
- } else {
+ .whenComplete((msg, throwable) -> {
+ if (throwable == null) {
assert msg instanceof LeaseGrantedMessageResponse :
"Message type is unexpected [type="
+ msg.getClass().getSimpleName() + ']';
+ } else if (!(unwrapCause(throwable) instanceof
NodeStoppingException)) {
+ LOG.warn("Lease was not negotiated due to exception
[lease={}]", throwable, lease);
}
LeaseGrantedMessageResponse response =
(LeaseGrantedMessageResponse) msg;
@@ -95,8 +97,6 @@ public class LeaseNegotiator {
fut.complete(response);
triggerToRenewLeases();
-
- return msg;
});
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 01e6d13e5c..b3677517ea 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.replicator;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.io.IOException;
@@ -377,14 +378,12 @@ public class ReplicaManager implements IgniteComponent {
replicaFut
.thenCompose(replica ->
replica.processPlacementDriverMessage(msg))
- .handle((response, ex) -> {
+ .whenComplete((response, ex) -> {
if (ex == null) {
clusterNetSvc.messagingService().respond(senderConsistentId, response,
correlationId);
- } else {
+ } else if (!(unwrapCause(ex) instanceof
NodeStoppingException)) {
LOG.error("Failed to process placement driver
message [msg={}]", ex, msg);
}
-
- return null;
});
} finally {
busyLock.leaveBusy();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteStopLogMessagesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteStopLogMessagesTest.java
new file mode 100644
index 0000000000..e75a69f831
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteStopLogMessagesTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.testframework.log4j2.LogInspector;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class that checks that no excessive error messages are being printed on
node stop.
+ */
+public class ItIgniteStopLogMessagesTest extends ClusterPerTestIntegrationTest
{
+ private static class FailureMessageInspector {
+ private final LogInspector logInspector;
+
+ private final String expectedMessage;
+
+ private final AtomicInteger msgCount = new AtomicInteger();
+
+ FailureMessageInspector(Class<?> loggerClass, String expectedMessage) {
+ this.logInspector = LogInspector.create(loggerClass);
+ this.expectedMessage = expectedMessage;
+
+ logInspector.addHandler(
+ logEvent -> {
+ Throwable throwable = logEvent.getThrown();
+
+ return throwable != null
+ && unwrapCause(throwable) instanceof
NodeStoppingException
+ &&
logEvent.getMessage().getFormattedMessage().contains(this.expectedMessage);
+ },
+ msgCount::incrementAndGet
+ );
+ }
+
+ void start() {
+ logInspector.start();
+ }
+
+ void stop() {
+ logInspector.stop();
+ }
+
+ void assertNoMessages() {
+ assertThat(String.format("Error message '%s' is present in the
log", expectedMessage), msgCount.get(), is(0));
+ }
+ }
+
+ private final List<FailureMessageInspector> logInspectors = List.of(
+ new FailureMessageInspector(ReplicaManager.class, "Failed to stop
replica"),
+ new FailureMessageInspector(ReplicaManager.class, "Failed to
process placement driver message"),
+ new FailureMessageInspector(LeaseNegotiator.class, "Lease was not
negotiated due to exception")
+ );
+
+ @BeforeEach
+ void startLogInspectors() {
+ logInspectors.forEach(FailureMessageInspector::start);
+ }
+
+ @AfterEach
+ @Override
+ public void tearDown() {
+ super.tearDown();
+
+ logInspectors.forEach(FailureMessageInspector::stop);
+
+ logInspectors.forEach(FailureMessageInspector::assertNoMessages);
+ }
+
+ /**
+ * Modifies the state of the cluster, actual assertions happen in the
{@link #tearDown} method.
+ */
+ @Test
+ void testNoErrorMessagesOnStop() {
+ executeSql("CREATE TABLE TEST (key INT PRIMARY KEY)");
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e97e97875a..fe1f5e0802 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -66,8 +66,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
@@ -88,6 +88,7 @@ import
org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite.internal.causality.CompletionListener;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
+import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -269,6 +270,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Prevents double stopping the component. */
+ private final AtomicBoolean beforeStopGuard = new AtomicBoolean();
+
private final AtomicBoolean stopGuard = new AtomicBoolean();
/** Schema manager. */
@@ -952,8 +955,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
@Override
- public void stop() {
- if (!stopGuard.compareAndSet(false, true)) {
+ public void beforeNodeStop() {
+ if (!beforeStopGuard.compareAndSet(false, true)) {
return;
}
@@ -965,23 +968,32 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
- Map<Integer, TableImpl> tablesToStop =
Stream.concat(latestTablesById().entrySet().stream(),
pendingTables.entrySet().stream())
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue, (v1, v2) -> v1));
+ var tablesToStop = new HashMap<Integer, TableImpl>();
+
+ tablesToStop.putAll(latestTablesById());
+ tablesToStop.putAll(pendingTables);
cleanUpTablesResources(tablesToStop);
+ }
- try {
- IgniteUtils.closeAllManually(lowWatermark, mvGc);
- } catch (Throwable t) {
- LOG.error("Failed to close internal components", t);
+ @Override
+ public void stop() throws Exception {
+ assert beforeStopGuard.get() : "'stop' called before 'beforeNodeStop'";
+
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
}
- shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS);
- shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS);
- shutdownAndAwaitTermination(txStateStoragePool, 10, TimeUnit.SECONDS);
- shutdownAndAwaitTermination(txStateStorageScheduledPool, 10,
TimeUnit.SECONDS);
- shutdownAndAwaitTermination(scanRequestExecutor, 10, TimeUnit.SECONDS);
- shutdownAndAwaitTermination(incomingSnapshotsExecutor, 10,
TimeUnit.SECONDS);
+ IgniteUtils.closeAllManually(
+ lowWatermark,
+ mvGc,
+ () -> shutdownAndAwaitTermination(rebalanceScheduler, 10,
TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(ioExecutor, 10,
TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(txStateStoragePool, 10,
TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(txStateStorageScheduledPool,
10, TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(scanRequestExecutor, 10,
TimeUnit.SECONDS),
+ () -> shutdownAndAwaitTermination(incomingSnapshotsExecutor,
10, TimeUnit.SECONDS)
+ );
}
/**
@@ -990,73 +1002,44 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @param tables Tables to stop.
*/
private void cleanUpTablesResources(Map<Integer, TableImpl> tables) {
- for (TableImpl table : tables.values()) {
- table.beforeClose();
+ var futures = new ArrayList<CompletableFuture<Void>>(tables.size());
- List<Runnable> stopping = new ArrayList<>();
-
- AtomicReference<Throwable> throwable = new AtomicReference<>();
-
- AtomicBoolean nodeStoppingEx = new AtomicBoolean();
+ for (TableImpl table : tables.values()) {
+ futures.add(runAsync(() -> {
+ Stream.Builder<ManuallyCloseable> stopping = Stream.builder();
- InternalTable internalTable = table.internalTable();
+ stopping.add(table::beforeClose);
- for (int p = 0; p < internalTable.partitions(); p++) {
- int partitionId = p;
+ InternalTable internalTable = table.internalTable();
- TablePartitionId replicationGroupId = new
TablePartitionId(table.tableId(), p);
+ for (int p = 0; p < internalTable.partitions(); p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(table.tableId(), p);
- stopping.add(() -> {
- try {
- raftMgr.stopRaftNodes(replicationGroupId);
- } catch (Throwable t) {
- handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
- }
- });
+ stopping.add(() -> closePartitionTrackers(internalTable,
replicationGroupId.partitionId()));
- stopping.add(() -> {
- try {
- replicaMgr.stopReplica(replicationGroupId).join();
- } catch (Throwable t) {
- handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
- }
- });
+ stopping.add(() ->
replicaMgr.stopReplica(replicationGroupId).get(10, TimeUnit.SECONDS));
- CompletableFuture<Void> removeFromGcFuture =
mvGc.removeStorage(replicationGroupId);
+ stopping.add(() ->
raftMgr.stopRaftNodes(replicationGroupId));
- stopping.add(() -> {
- try {
- closePartitionTrackers(internalTable, partitionId);
- } catch (Throwable t) {
- handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
- }
- });
-
- stopping.add(() -> {
- try {
- // Should be done fairly quickly.
- removeFromGcFuture.join();
- } catch (Throwable t) {
- handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
- }
- });
- }
+ stopping.add(() ->
mvGc.removeStorage(replicationGroupId).get(10, TimeUnit.SECONDS));
+ }
- stopping.forEach(Runnable::run);
+ stopping.add(internalTable.storage());
+ stopping.add(internalTable.txStateStorage());
+ stopping.add(internalTable);
- try {
- IgniteUtils.closeAllManually(
- internalTable.storage(),
- internalTable.txStateStorage(),
- internalTable
- );
- } catch (Throwable t) {
- handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
- }
+ try {
+ IgniteUtils.closeAllManually(stopping.build());
+ } catch (Throwable t) {
+ LOG.error("Unable to stop table [name={}, tableId={}]", t,
table.name(), table.tableId());
+ }
+ }, ioExecutor));
+ }
- if (throwable.get() != null) {
- LOG.error("Unable to stop table [name={}, tableId={}]",
throwable.get(), table.name(), table.tableId());
- }
+ try {
+ allOf(futures.toArray(CompletableFuture[]::new)).get(30,
TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ LOG.error("Unable to clean table resources", e);
}
}
@@ -2142,26 +2125,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
.thenCompose(Function.identity());
}
- private static void handleExceptionOnCleanUpTablesResources(
- Throwable t,
- AtomicReference<Throwable> throwable,
- AtomicBoolean nodeStoppingEx
- ) {
- if (t instanceof CompletionException || t instanceof
ExecutionException) {
- t = t.getCause();
- }
-
- if (!throwable.compareAndSet(null, t)) {
- if (!(t instanceof NodeStoppingException) ||
!nodeStoppingEx.get()) {
- throwable.get().addSuppressed(t);
- }
- }
-
- if (t instanceof NodeStoppingException) {
- nodeStoppingEx.set(true);
- }
- }
-
private int[] collectTableIndexIds(int tableId, int catalogVersion) {
return catalogService.indexes(catalogVersion).stream()
.filter(indexDescriptor -> indexDescriptor.tableId() ==
tableId)
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index e5f109659e..6f1cefff12 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -338,7 +338,7 @@ public class TableManagerTest extends IgniteAbstractTest {
* Tests a work of the public API for Table manager {@see
org.apache.ignite.table.manager.IgniteTables} when the manager is stopping.
*/
@Test
- public void testApiTableManagerOnStop() {
+ public void testApiTableManagerOnStop() throws Exception {
createTableManager(tblManagerFut);
TableManager tableManager = tblManagerFut.join();
@@ -358,7 +358,7 @@ public class TableManagerTest extends IgniteAbstractTest {
* stopping.
*/
@Test
- public void testInternalApiTableManagerOnStop() {
+ public void testInternalApiTableManagerOnStop() throws Exception {
createTableManager(tblManagerFut);
TableManager tableManager = tblManagerFut.join();
@@ -385,7 +385,7 @@ public class TableManagerTest extends IgniteAbstractTest {
endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
() -> {
try {
- doThrow(new
NodeStoppingException()).when(rm).stopRaftNodes(any());
+
when(rm.stopRaftNodes(any())).thenThrow(NodeStoppingException.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -405,7 +405,7 @@ public class TableManagerTest extends IgniteAbstractTest {
endTableManagerStopTest(tblAndMnr.get1(), tblAndMnr.get2(),
() -> {
try {
- doThrow(new
NodeStoppingException()).when(replicaMgr).stopReplica(any());
+
when(replicaMgr.stopReplica(any())).thenThrow(NodeStoppingException.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -459,6 +459,7 @@ public class TableManagerTest extends IgniteAbstractTest {
private void endTableManagerStopTest(TableViewInternal table, TableManager
tableManager, Runnable mockDoThrow) throws Exception {
mockDoThrow.run();
+ tableManager.beforeNodeStop();
tableManager.stop();
verify(rm, times(PARTITIONS)).stopRaftNodes(any());