This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ac06474f8c [SPARK-38830][CORE] Warn on corrupted block messages
8ac06474f8c is described below

commit 8ac06474f8cfa8e5619f817aaeea29a77ec8a2a4
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Apr 10 17:44:39 2022 -0700

    [SPARK-38830][CORE] Warn on corrupted block messages
    
    ### What changes were proposed in this pull request?
    
    This PR aims to warn when `NettyBlockRpcServer` received a corrupted block 
RPC message or under attack.
    
    - `IllegalArgumentException`: When the type is unknown/invalid when 
decoding. This fails at Spark layer.
    - `NegativeArraySizeException`: When the size read is negative. This fails 
at Spark layer during buffer creation.
    - `IndexOutOfBoundsException`: When the data field isn't matched with the 
size. This fails at Netty later.
    
    ### Why are the changes needed?
    
    When the RPC messages are corrupted or the servers are under attack, Spark 
shows `IndexOutOfBoundsException` due to the failure from `Decoder`. Instead of 
`Exception`, we had better ignore the message with a directional warning 
message.
    ```
    java.lang.IndexOutOfBoundsException:
        readerIndex(5) + length(602416) exceeds writerIndex(172):
    UnpooledUnsafeDirectByteBuf(ridx: 5, widx: 172, cap: 172/172)
        at 
io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1477)
        at 
io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1463)
        at 
io.netty.buffer.UnpooledDirectByteBuf.readBytes(UnpooledDirectByteBuf.java:316)
        at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:904)
        at 
org.apache.spark.network.protocol.Encoders$Strings.decode(Encoders.java:45)
        at 
org.apache.spark.network.shuffle.protocol.UploadBlock.decode(UploadBlock.java:112)
        at 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:71)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:53)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:161)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
    
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but this clarify the log messages from exceptions, 
`IndexOutOfBoundsException`.
    
    ### How was this patch tested?
    
    Pass the CIs with newly added test suite.
    
    Closes #36116 from dongjoon-hyun/SPARK-38830.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/network/netty/NettyBlockRpcServer.scala  | 18 ++++++-
 .../network/netty/NettyBlockRpcServerSuite.scala   | 59 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index 81c878d17c6..f2a1fe49fcf 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -50,7 +50,23 @@ class NettyBlockRpcServer(
       client: TransportClient,
       rpcMessage: ByteBuffer,
       responseContext: RpcResponseCallback): Unit = {
-    val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
+    val message = try {
+      BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
+    } catch {
+      case e: IllegalArgumentException if e.getMessage.startsWith("Unknown 
message type") =>
+        logWarning(s"This could be a corrupted RPC message (capacity: 
${rpcMessage.capacity()}) " +
+          s"from ${client.getSocketAddress}. Please use `spark.authenticate.*` 
configurations " +
+          "in case of security incidents.")
+        throw e
+
+      case _: IndexOutOfBoundsException | _: NegativeArraySizeException =>
+        // Netty may throw non-'IOException's for corrupted buffers. In this 
case,
+        // we ignore the entire message with warnings because we cannot trust 
any contents.
+        logWarning(s"Ignored a corrupted RPC message (capacity: 
${rpcMessage.capacity()}) " +
+          s"from ${client.getSocketAddress}. Please use `spark.authenticate.*` 
configurations " +
+          "in case of security incidents.")
+        return
+    }
     logTrace(s"Received request: $message")
 
     message match {
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockRpcServerSuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockRpcServerSuite.scala
new file mode 100644
index 00000000000..54e83e7bda5
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockRpcServerSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.nio.ByteBuffer
+
+import org.mockito.Mockito.mock
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.network.client.TransportClient
+import org.apache.spark.serializer.JavaSerializer
+
+class NettyBlockRpcServerSuite extends SparkFunSuite {
+
+  test("SPARK-38830: Rethrow IllegalArgumentException due to `Unknown message 
type`") {
+    val serializer = new JavaSerializer(new SparkConf)
+    val server = new NettyBlockRpcServer("enhanced-rpc-server", serializer, 
null)
+    val bytes = Array[Byte](100.toByte)
+    val message = ByteBuffer.wrap(bytes)
+    val client = mock(classOf[TransportClient])
+    val m = intercept[IllegalArgumentException] {
+      server.receive(client, message)
+    }.getMessage
+    assert(m.startsWith("Unknown message type: 100"))
+  }
+
+  test("SPARK-38830: Warn and ignore NegativeArraySizeException due to the 
corruption") {
+    val serializer = new JavaSerializer(new SparkConf)
+    val server = new NettyBlockRpcServer("enhanced-rpc-server", serializer, 
null)
+    val bytes = Array[Byte](0.toByte, 0xFF.toByte, 0xFF.toByte, 0xFF.toByte, 
0xFF.toByte)
+    val message = ByteBuffer.wrap(bytes)
+    val client = mock(classOf[TransportClient])
+    server.receive(client, message)
+  }
+
+  test("SPARK-38830: Warn and ignore IndexOutOfBoundsException due to the 
corruption") {
+    val serializer = new JavaSerializer(new SparkConf)
+    val server = new NettyBlockRpcServer("enhanced-rpc-server", serializer, 
null)
+    val bytes = Array[Byte](1.toByte)
+    val message = ByteBuffer.wrap(bytes)
+    val client = mock(classOf[TransportClient])
+    server.receive(client, message)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to