This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 143f7e3 IGNITE-12844: Too many caches can block discovery. This
closes #7582
143f7e3 is described below
commit 143f7e341f87fa3a5744ce9c62cb60ebbd13f9e6
Author: Philipp Masharov <[email protected]>
AuthorDate: Thu Apr 16 17:50:01 2020 +0300
IGNITE-12844: Too many caches can block discovery. This closes #7582
---
.../org/apache/ignite/IgniteSystemProperties.java | 6 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 80 +++++--------
.../ignite/spi/discovery/tcp/ServerImpl.java | 76 +++++-------
.../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java | 54 +++++++++
.../tcp/TcpDiscoveryMetricsWarnLogTest.java | 133 +++++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +
6 files changed, 252 insertions(+), 99 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 210a048..bcec9ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -17,13 +17,13 @@
package org.apache.ignite;
-import javax.net.ssl.HostnameVerifier;
import java.io.Serializable;
import java.lang.management.RuntimeMXBean;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
+import javax.net.ssl.HostnameVerifier;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -535,6 +535,10 @@ public final class IgniteSystemProperties {
public static final String IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE =
"IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE";
+ /** Logging a warning message when metrics quantity exceeded a specified
number. */
+ public static final String IGNITE_DISCOVERY_METRICS_QNT_WARN =
+ "IGNITE_DISCOVERY_METRICS_QNT_WARN";
+
/** Time interval that indicates that client reconnect throttle must be
reset to zero. 2 minutes by default. */
public static final String
CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL =
"CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL";
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index d45dcf7..a352ff2 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -973,6 +973,34 @@ class ClientImpl extends TcpDiscoveryImpl {
return res;
}
+ /** {@inheritDoc} */
+ @Override public void updateMetrics(UUID nodeId,
+ ClusterMetrics metrics,
+ Map<Integer, CacheMetrics> cacheMetrics,
+ long tsNanos)
+ {
+ boolean isLocDaemon = spi.locNode.isDaemon();
+
+ assert nodeId != null;
+ assert metrics != null;
+ assert isLocDaemon || cacheMetrics != null;
+
+ TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode :
rmtNodes.get(nodeId);
+
+ if (node != null && node.visible()) {
+ node.setMetrics(metrics);
+
+ if (!isLocDaemon)
+ node.setCacheMetrics(cacheMetrics);
+
+ node.lastUpdateTimeNanos(tsNanos);
+
+ msgWorker.notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node,
allVisibleNodes(), null);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Received metrics from unknown node: " + nodeId);
+ }
+
/**
* FOR TEST PURPOSE ONLY!
*/
@@ -2447,23 +2475,8 @@ class ClientImpl extends TcpDiscoveryImpl {
log.debug("Received metrics response: " + msg);
}
else {
- long tsNanos = System.nanoTime();
-
- if (msg.hasMetrics()) {
- for (Map.Entry<UUID,
TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
- UUID nodeId = e.getKey();
-
- TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet
= e.getValue();
-
- Map<Integer, CacheMetrics> cacheMetrics =
msg.hasCacheMetrics(nodeId) ?
- msg.cacheMetrics().get(nodeId) :
Collections.<Integer, CacheMetrics>emptyMap();
-
- updateMetrics(nodeId, metricsSet.metrics(),
cacheMetrics, tsNanos);
-
- for (T2<UUID, ClusterMetrics> t :
metricsSet.clientMetrics())
- updateMetrics(t.get1(), t.get2(), cacheMetrics,
tsNanos);
- }
- }
+ if (msg.hasMetrics())
+ processMsgCacheMetrics(msg, System.nanoTime());
}
}
@@ -2564,39 +2577,6 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
- * @param nodeId Node ID.
- * @param metrics Metrics.
- * @param cacheMetrics Cache metrics.
- * @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
- */
- private void updateMetrics(UUID nodeId,
- ClusterMetrics metrics,
- Map<Integer, CacheMetrics> cacheMetrics,
- long tsNanos)
- {
- boolean isLocDaemon = spi.locNode.isDaemon();
-
- assert nodeId != null;
- assert metrics != null;
- assert isLocDaemon || cacheMetrics != null;
-
- TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode
: rmtNodes.get(nodeId);
-
- if (node != null && node.visible()) {
- node.setMetrics(metrics);
-
- if (!isLocDaemon)
- node.setCacheMetrics(cacheMetrics);
-
- node.lastUpdateTimeNanos(tsNanos);
-
- notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node,
allVisibleNodes());
- }
- else if (log.isDebugEnabled())
- log.debug("Received metrics from unknown node: " + nodeId);
- }
-
- /**
* @param type Event type.
* @param topVer Topology version.
* @param node Node.
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2886478..a2fe506 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -17,9 +17,6 @@
package org.apache.ignite.spi.discovery.tcp;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLSocket;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -61,6 +58,9 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLSocket;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -1916,6 +1916,29 @@ class ServerImpl extends TcpDiscoveryImpl {
return threads;
}
+ /** {@inheritDoc} */
+ @Override public void updateMetrics(UUID nodeId,
+ ClusterMetrics metrics,
+ Map<Integer, CacheMetrics> cacheMetrics,
+ long tsNanos)
+ {
+ assert nodeId != null;
+ assert metrics != null;
+
+ TcpDiscoveryNode node = ring.node(nodeId);
+
+ if (node != null) {
+ node.setMetrics(metrics);
+ node.setCacheMetrics(cacheMetrics);
+
+ node.lastUpdateTimeNanos(tsNanos);
+
+ notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(),
node);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Received metrics from unknown node: " + nodeId);
+ }
+
/**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
@@ -5739,23 +5762,8 @@ class ServerImpl extends TcpDiscoveryImpl {
long tsNanos = System.nanoTime();
- if (spiStateCopy() == CONNECTED) {
- if (msg.hasMetrics()) {
- for (Map.Entry<UUID,
TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
- UUID nodeId = e.getKey();
-
- TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet
= e.getValue();
-
- Map<Integer, CacheMetrics> cacheMetrics =
msg.hasCacheMetrics(nodeId) ?
- msg.cacheMetrics().get(nodeId) :
Collections.emptyMap();
-
- updateMetrics(nodeId, metricsSet.metrics(),
cacheMetrics, tsNanos);
-
- for (T2<UUID, ClusterMetrics> t :
metricsSet.clientMetrics())
- updateMetrics(t.get1(), t.get2(), cacheMetrics,
tsNanos);
- }
- }
- }
+ if (spiStateCopy() == CONNECTED && msg.hasMetrics())
+ processMsgCacheMetrics(msg, tsNanos);
if (sendMessageToRemotes(msg)) {
if (laps == 0 && spiStateCopy() == CONNECTED) {
@@ -5825,34 +5833,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * @param nodeId Node ID.
- * @param metrics Metrics.
- * @param cacheMetrics Cache metrics.
- * @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
- */
- private void updateMetrics(UUID nodeId,
- ClusterMetrics metrics,
- Map<Integer, CacheMetrics> cacheMetrics,
- long tsNanos)
- {
- assert nodeId != null;
- assert metrics != null;
-
- TcpDiscoveryNode node = ring.node(nodeId);
-
- if (node != null) {
- node.setMetrics(metrics);
- node.setCacheMetrics(cacheMetrics);
-
- node.lastUpdateTimeNanos(tsNanos);
-
- notifyDiscovery(EVT_NODE_METRICS_UPDATED,
ring.topologyVersion(), node);
- }
- else if (log.isDebugEnabled())
- log.debug("Received metrics from unknown node: " + nodeId);
- }
-
- /**
* Processes discard message and discards previously registered
pending messages.
*
* @param msg Discard message.
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index bbd78fe..7980efc 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -24,13 +24,17 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -40,9 +44,13 @@ import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_METRICS_QNT_WARN;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+
/**
*
*/
@@ -59,6 +67,9 @@ abstract class TcpDiscoveryImpl {
/** Response join impossible. */
protected static final int RES_JOIN_IMPOSSIBLE = 255;
+ /** How often the warning message should occur in logs to prevent log
spam. */
+ public static final long LOG_WARN_MSG_TIMEOUT = 60 * 60 * 1000L;
+
/** */
protected final TcpDiscoverySpi spi;
@@ -77,6 +88,12 @@ abstract class TcpDiscoveryImpl {
/** Received messages. */
protected ConcurrentLinkedDeque<String> debugLogQ;
+ /** Logging a warning message when metrics quantity exceeded a specified
number. */
+ protected int METRICS_QNT_WARN =
getInteger(IGNITE_DISCOVERY_METRICS_QNT_WARN, 500);
+
+ /** */
+ protected long endTimeMetricsSizeProcessWait = System.currentTimeMillis();
+
/** */
protected final ServerImpl.DebugLogger debugLog = new DebugLogger() {
/** {@inheritDoc} */
@@ -347,6 +364,17 @@ abstract class TcpDiscoveryImpl {
protected abstract Collection<IgniteSpiThread> threads();
/**
+ * @param nodeId Node ID.
+ * @param metrics Metrics.
+ * @param cacheMetrics Cache metrics.
+ * @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
+ */
+ public abstract void updateMetrics(UUID nodeId,
+ ClusterMetrics metrics,
+ Map<Integer, CacheMetrics> cacheMetrics,
+ long tsNanos);
+
+ /**
* @throws IgniteSpiException If failed.
*/
protected final void registerLocalNodeAddress() throws IgniteSpiException {
@@ -407,6 +435,32 @@ abstract class TcpDiscoveryImpl {
return true;
}
+ /** */
+ public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg,
long tsNanos) {
+ for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e :
msg.metrics().entrySet()) {
+ UUID nodeId = e.getKey();
+
+ TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet =
e.getValue();
+
+ Map<Integer, CacheMetrics> cacheMetrics =
msg.hasCacheMetrics(nodeId) ?
+ msg.cacheMetrics().get(nodeId) : Collections.emptyMap();
+
+ if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis()
+ && cacheMetrics.size() >= METRICS_QNT_WARN)
+ {
+ log.warning("The Discovery message has metrics for " +
cacheMetrics.size() + " caches.\n" +
+ "To prevent Discovery blocking use
-DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option.");
+
+ endTimeMetricsSizeProcessWait = U.currentTimeMillis() +
LOG_WARN_MSG_TIMEOUT;
+ }
+
+ updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);
+
+ for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
+ updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
+ }
+ }
+
/**
* @param addrs Addresses.
*/
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMetricsWarnLogTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMetricsWarnLogTest.java
new file mode 100644
index 0000000..e6b6beb
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMetricsWarnLogTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_METRICS_QNT_WARN;
+
+/**
+ * Class for testing warning log message about too many cache metrics.
+ */
+public class TcpDiscoveryMetricsWarnLogTest extends GridCommonAbstractTest {
+ /** Listener log messages. */
+ private static ListeningTestLogger testLog;
+
+ /** Desired message. */
+ public static final String LOG_MSG = "To prevent Discovery blocking use";
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ testLog = new ListeningTestLogger(false, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ testLog.clearListeners();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setMetricsUpdateFrequency(500L)
+ .setGridLogger(testLog);
+ }
+
+ /**
+ * Test checks that the desired message occurs in logs.
+ *
+ * @throws Exception If any error occurs.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_DISCOVERY_METRICS_QNT_WARN, value = "20")
+ public void testMetricsWarningLog() throws Exception {
+ testLog.warning("IGNITE_DISCOVERY_METRICS_QNT_WARN = "
+ + System.getProperty(IGNITE_DISCOVERY_METRICS_QNT_WARN));
+
+ LogListener logLsnr0 = LogListener.matches(LOG_MSG)
+ .andMatches("TcpDiscoveryMetricsWarnLogTest0")
+ .atLeast(1)
+ .build();
+
+ LogListener logLsnr1 = LogListener.matches(LOG_MSG)
+ .andMatches("TcpDiscoveryMetricsWarnLogTest1")
+ .atLeast(1)
+ .build();
+
+ LogListener logLsnr2 = LogListener.matches(LOG_MSG)
+ .andMatches("TcpDiscoveryMetricsWarnLogTest2")
+ .atLeast(1)
+ .build();
+
+ testLog.registerListener(logLsnr0);
+ testLog.registerListener(logLsnr1);
+ testLog.registerListener(logLsnr2);
+
+ Ignite ignite0 = startGrid(0);
+
+ startGrid(1);
+
+ startClientGrid(2);
+
+ for (int i = 1; i <= 30; i++)
+ createAndFillCache(i, ignite0);
+
+ awaitMetricsUpdate(3);
+
+ assertTrue(logLsnr0.check());
+ assertTrue(logLsnr1.check());
+ assertTrue(logLsnr2.check());
+ }
+
+ @Test
+ @WithSystemProperty(key = IGNITE_DISCOVERY_METRICS_QNT_WARN, value = "0")
+ public void testMetricsWarningLog0() throws Exception {
+ testMetricsWarningLog();
+ }
+
+ /**
+ * Creates and fills cahes with test data.
+ *
+ * @param cacheNum Cache number to generate a cache name.
+ * @param ignite Ignite instance to create a cache in.
+ */
+ private void createAndFillCache(int cacheNum, Ignite ignite) {
+ IgniteCache<Object, Object> cache = ignite.getOrCreateCache(
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME +
cacheNum).setStatisticsEnabled(true)
+ );
+
+ for (int i = 1; i < 100; i++)
+ cache.put(i, i);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 22931a8..6771904 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryCoordinatorFailureTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryFailedJoinTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIpFinderCleanerTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMarshallerCheckSelfTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMetricsWarnLogTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMultiThreadedTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNetworkIssuesTest;
import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeAttributesUpdateOnReconnectTest;
@@ -119,6 +120,7 @@ import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
TcpDiscoveryRestartTest.class,
TcpDiscoveryMultiThreadedTest.class,
+ TcpDiscoveryMetricsWarnLogTest.class,
//TcpDiscoveryConcurrentStartTest.class,
TcpDiscoverySegmentationPolicyTest.class,