This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 46ee939 Internode messaging catches OOMs and does not rethrow
46ee939 is described below
commit 46ee939b957528185dc6bbd3028c1d6e695163e7
Author: Yifan Cai <[email protected]>
AuthorDate: Thu Nov 19 09:13:30 2020 -0800
Internode messaging catches OOMs and does not rethrow
patch by Yifan Cai; reviewed by David Capwell, Jordan West for
CASSANDRA-15214
---
CHANGES.txt | 1 +
.../cassandra/net/InboundMessageHandler.java | 6 +--
.../apache/cassandra/net/OutboundConnection.java | 6 +--
.../cassandra/net/OutboundConnectionInitiator.java | 2 +-
.../org/apache/cassandra/transport/Message.java | 2 +
.../cassandra/utils/JVMStabilityInspector.java | 51 +++++++++++++++++-----
.../cassandra/utils/JVMStabilityInspectorTest.java | 17 ++++++++
7 files changed, 66 insertions(+), 19 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0d93b85..beb878a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
* Invalid serialized size for responses caused by increasing message time by
1ms which caused extra bytes in size calculation (CASSANDRA-16103)
* Throw BufferOverflowException from DataOutputBuffer for better visibility
(CASSANDRA-16214)
* TLS connections to the storage port on a node without server encryption
configured causes java.io.IOException accessing missing keystore
(CASSANDRA-16144)
+ * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214)
Merged from 3.11:
* SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to
default of 1GB (CASSANDRA-16071)
Merged from 3.0:
diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java
b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
index 534128e..2cac3eb 100644
--- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
@@ -329,7 +329,7 @@ public class InboundMessageHandler extends
ChannelInboundHandlerAdapter implemen
}
catch (Throwable t)
{
- JVMStabilityInspector.inspectThrowable(t, false);
+ JVMStabilityInspector.inspectThrowable(t);
callbacks.onFailedDeserialize(size, header, t);
logger.error("{} unexpected exception caught while deserializing a
message", id(), t);
}
@@ -648,7 +648,7 @@ public class InboundMessageHandler extends
ChannelInboundHandlerAdapter implemen
{
decoder.discard();
- JVMStabilityInspector.inspectThrowable(cause, false);
+ JVMStabilityInspector.inspectThrowable(cause);
if (cause instanceof Message.InvalidLegacyProtocolMagic)
logger.error("{} invalid, unrecoverable CRC mismatch detected
while reading messages - closing the connection", id());
@@ -832,7 +832,7 @@ public class InboundMessageHandler extends
ChannelInboundHandlerAdapter implemen
}
catch (Throwable t)
{
- JVMStabilityInspector.inspectThrowable(t, false);
+ JVMStabilityInspector.inspectThrowable(t);
callbacks.onFailedDeserialize(size, header, t);
logger.error("{} unexpected exception caught while
deserializing a message", id(), t);
}
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java
b/src/java/org/apache/cassandra/net/OutboundConnection.java
index 79c0459..98c034c 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -485,7 +485,7 @@ public class OutboundConnection
*/
private void onFailedSerialize(Message<?> message, int messagingVersion,
int bytesWrittenToNetwork, Throwable t)
{
- JVMStabilityInspector.inspectThrowable(t, false);
+ JVMStabilityInspector.inspectThrowable(t);
releaseCapacity(1, canonicalSize(message));
errorCount += 1;
errorBytes += message.serializedSize(messagingVersion);
@@ -1047,7 +1047,7 @@ public class OutboundConnection
private void invalidateChannel(Established established, Throwable cause)
{
- JVMStabilityInspector.inspectThrowable(cause, false);
+ JVMStabilityInspector.inspectThrowable(cause);
if (state != established)
return; // do nothing; channel already invalidated
@@ -1093,7 +1093,7 @@ public class OutboundConnection
else
noSpamLogger.error("{} failed to connect", id(), cause);
- JVMStabilityInspector.inspectThrowable(cause, false);
+ JVMStabilityInspector.inspectThrowable(cause);
if (hasPending())
{
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
index 2c26005..f1fa6b7 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
@@ -382,7 +382,7 @@ public class OutboundConnectionInitiator<SuccessType
extends OutboundConnectionI
try
{
- JVMStabilityInspector.inspectThrowable(cause, false);
+ JVMStabilityInspector.inspectThrowable(cause);
resultPromise.tryFailure(cause);
if (isCausedByConnectionReset(cause))
logger.info("Failed to connect to peer {}",
settings.connectToId(), cause);
diff --git a/src/java/org/apache/cassandra/transport/Message.java
b/src/java/org/apache/cassandra/transport/Message.java
index 09ea2e5..59195c4 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -805,6 +805,8 @@ public abstract class Message
});
}
}
+
+ JVMStabilityInspector.inspectThrowable(cause);
}
}
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index ae28410..6f4e4c6 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.utils;
import java.io.FileNotFoundException;
import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -28,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import net.nicoulaj.compilecommand.annotations.Exclude;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -60,17 +64,12 @@ public final class JVMStabilityInspector
*/
public static void inspectThrowable(Throwable t) throws OutOfMemoryError
{
- inspectThrowable(t, true);
- }
-
- public static void inspectThrowable(Throwable t, boolean
propagateOutOfMemory) throws OutOfMemoryError
- {
- inspectThrowable(t, propagateOutOfMemory,
JVMStabilityInspector::inspectDiskError);
+ inspectThrowable(t, JVMStabilityInspector::inspectDiskError);
}
public static void inspectCommitLogThrowable(Throwable t)
{
- inspectThrowable(t, true,
JVMStabilityInspector::inspectCommitLogError);
+ inspectThrowable(t, JVMStabilityInspector::inspectCommitLogError);
}
private static void inspectDiskError(Throwable t)
@@ -81,7 +80,7 @@ public final class JVMStabilityInspector
FileUtils.handleFSError((FSError) t);
}
- public static void inspectThrowable(Throwable t, boolean
propagateOutOfMemory, Consumer<Throwable> fn) throws OutOfMemoryError
+ public static void inspectThrowable(Throwable t, Consumer<Throwable> fn)
throws OutOfMemoryError
{
boolean isUnstable = false;
if (t instanceof OutOfMemoryError)
@@ -102,11 +101,11 @@ public final class JVMStabilityInspector
logger.error("OutOfMemory error letting the JVM handle the
error:", t);
StorageService.instance.removeShutdownHook();
+
+ forceHeapSpaceOomMaybe((OutOfMemoryError) t);
+
// We let the JVM handle the error. The startup checks should have
warned the user if it did not configure
// the JVM behavior in case of OOM (CASSANDRA-13006).
- if (!propagateOutOfMemory)
- return;
-
throw (OutOfMemoryError) t;
}
@@ -125,7 +124,35 @@ public final class JVMStabilityInspector
killer.killCurrentJVM(t);
if (t.getCause() != null)
- inspectThrowable(t.getCause(), propagateOutOfMemory, fn);
+ inspectThrowable(t.getCause(), fn);
+ }
+
+ /**
+ * Intentionally produce a heap space OOM upon seeing a Direct buffer
memory OOM.
+ * Direct buffer OOM cannot trigger JVM OOM error related options,
+ * e.g. OnOutOfMemoryError, HeapDumpOnOutOfMemoryError, etc.
+ * See CASSANDRA-15214 for more details
+ */
+ @Exclude // Exclude from just in time compilation.
+ private static void forceHeapSpaceOomMaybe(OutOfMemoryError oom)
+ {
+ // See the oom thrown from java.nio.Bits.reserveMemory.
+ // In jdk 13 and up, the message is "Cannot reserve XX bytes of direct
buffer memory (...)"
+ // In jdk 11 and below, the message is "Direct buffer memory"
+ if ((oom.getMessage() != null &&
oom.getMessage().toLowerCase().contains("direct buffer memory")) ||
+ Arrays.stream(oom.getStackTrace()).anyMatch(x ->
x.getClassName().equals("java.nio.Bits")
+ &&
x.getMethodName().equals("reserveMemory")))
+ {
+ logger.error("Force heap space OutOfMemoryError in the presence
of", oom);
+ // Start to produce heap space OOM forcibly.
+ List<long[]> ignored = new ArrayList<>();
+ while (true)
+ {
+ // java.util.AbstractCollection.MAX_ARRAY_SIZE is defined as
Integer.MAX_VALUE - 8
+ // so Integer.MAX_VALUE / 2 should be a large enough and safe
size to request.
+ ignored.add(new long[Integer.MAX_VALUE / 2]);
+ }
+ }
}
private static void inspectCommitLogError(Throwable t)
diff --git
a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
index 3b1056d..109fdb1 100644
--- a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
+++ b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
@@ -31,7 +31,9 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -109,6 +111,21 @@ public class JVMStabilityInspectorTest
}
@Test
+ public void testForceHeapSpaceOom()
+ {
+ try
+ {
+ JVMStabilityInspector.inspectThrowable(new
OutOfMemoryError("Direct buffer memory"));
+ fail("The JVMStabilityInspector should force trigger a heap space
OutOfMemoryError and delegate the handling to the JVM");
+ }
+ catch (Throwable e)
+ {
+ assertSame(e.getClass(), OutOfMemoryError.class);
+ assertEquals("Java heap space", e.getMessage());
+ }
+ }
+
+ @Test
public void fileHandleTest()
{
KillerForTests killerForTests = new KillerForTests();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]