This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8ac06474f8c [SPARK-38830][CORE] Warn on corrupted block messages
8ac06474f8c is described below
commit 8ac06474f8cfa8e5619f817aaeea29a77ec8a2a4
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Apr 10 17:44:39 2022 -0700
[SPARK-38830][CORE] Warn on corrupted block messages
### What changes were proposed in this pull request?
This PR aims to warn when `NettyBlockRpcServer` received a corrupted block
RPC message or under attack.
- `IllegalArgumentException`: When the type is unknown/invalid when
decoding. This fails at Spark layer.
- `NegativeArraySizeException`: When the size read is negative. This fails
at Spark layer during buffer creation.
- `IndexOutOfBoundsException`: When the data field isn't matched with the
size. This fails at Netty later.
### Why are the changes needed?
When the RPC messages are corrupted or the servers are under attack, Spark
shows `IndexOutOfBoundsException` due to the failure from `Decoder`. Instead of
`Exception`, we had better ignore the message with a directional warning
message.
```
java.lang.IndexOutOfBoundsException:
readerIndex(5) + length(602416) exceeds writerIndex(172):
UnpooledUnsafeDirectByteBuf(ridx: 5, widx: 172, cap: 172/172)
at
io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1477)
at
io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1463)
at
io.netty.buffer.UnpooledDirectByteBuf.readBytes(UnpooledDirectByteBuf.java:316)
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:904)
at
org.apache.spark.network.protocol.Encoders$Strings.decode(Encoders.java:45)
at
org.apache.spark.network.shuffle.protocol.UploadBlock.decode(UploadBlock.java:112)
at
org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:71)
at
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:53)
at
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:161)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
```
### Does this PR introduce _any_ user-facing change?
Yes, but this clarify the log messages from exceptions,
`IndexOutOfBoundsException`.
### How was this patch tested?
Pass the CIs with newly added test suite.
Closes #36116 from dongjoon-hyun/SPARK-38830.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/network/netty/NettyBlockRpcServer.scala | 18 ++++++-
.../network/netty/NettyBlockRpcServerSuite.scala | 59 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index 81c878d17c6..f2a1fe49fcf 100644
---
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -50,7 +50,23 @@ class NettyBlockRpcServer(
client: TransportClient,
rpcMessage: ByteBuffer,
responseContext: RpcResponseCallback): Unit = {
- val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
+ val message = try {
+ BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
+ } catch {
+ case e: IllegalArgumentException if e.getMessage.startsWith("Unknown
message type") =>
+ logWarning(s"This could be a corrupted RPC message (capacity:
${rpcMessage.capacity()}) " +
+ s"from ${client.getSocketAddress}. Please use `spark.authenticate.*`
configurations " +
+ "in case of security incidents.")
+ throw e
+
+ case _: IndexOutOfBoundsException | _: NegativeArraySizeException =>
+ // Netty may throw non-'IOException's for corrupted buffers. In this
case,
+ // we ignore the entire message with warnings because we cannot trust
any contents.
+ logWarning(s"Ignored a corrupted RPC message (capacity:
${rpcMessage.capacity()}) " +
+ s"from ${client.getSocketAddress}. Please use `spark.authenticate.*`
configurations " +
+ "in case of security incidents.")
+ return
+ }
logTrace(s"Received request: $message")
message match {
diff --git
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockRpcServerSuite.scala
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockRpcServerSuite.scala
new file mode 100644
index 00000000000..54e83e7bda5
--- /dev/null
+++
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockRpcServerSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.nio.ByteBuffer
+
+import org.mockito.Mockito.mock
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.network.client.TransportClient
+import org.apache.spark.serializer.JavaSerializer
+
+class NettyBlockRpcServerSuite extends SparkFunSuite {
+
+ test("SPARK-38830: Rethrow IllegalArgumentException due to `Unknown message
type`") {
+ val serializer = new JavaSerializer(new SparkConf)
+ val server = new NettyBlockRpcServer("enhanced-rpc-server", serializer,
null)
+ val bytes = Array[Byte](100.toByte)
+ val message = ByteBuffer.wrap(bytes)
+ val client = mock(classOf[TransportClient])
+ val m = intercept[IllegalArgumentException] {
+ server.receive(client, message)
+ }.getMessage
+ assert(m.startsWith("Unknown message type: 100"))
+ }
+
+ test("SPARK-38830: Warn and ignore NegativeArraySizeException due to the
corruption") {
+ val serializer = new JavaSerializer(new SparkConf)
+ val server = new NettyBlockRpcServer("enhanced-rpc-server", serializer,
null)
+ val bytes = Array[Byte](0.toByte, 0xFF.toByte, 0xFF.toByte, 0xFF.toByte,
0xFF.toByte)
+ val message = ByteBuffer.wrap(bytes)
+ val client = mock(classOf[TransportClient])
+ server.receive(client, message)
+ }
+
+ test("SPARK-38830: Warn and ignore IndexOutOfBoundsException due to the
corruption") {
+ val serializer = new JavaSerializer(new SparkConf)
+ val server = new NettyBlockRpcServer("enhanced-rpc-server", serializer,
null)
+ val bytes = Array[Byte](1.toByte)
+ val message = ByteBuffer.wrap(bytes)
+ val client = mock(classOf[TransportClient])
+ server.receive(client, message)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]