This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 461d508  [SPARK-34790][CORE] Disable fetching shuffle blocks in batch 
when io encryption is enabled
461d508 is described below

commit 461d508e23c79c1b6d71eba5cf51b8e96093cf73
Author: hezuojiao <[email protected]>
AuthorDate: Mon Mar 22 13:06:12 2021 -0700

    [SPARK-34790][CORE] Disable fetching shuffle blocks in batch when io 
encryption is enabled
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to disable fetching shuffle blocks in batch when io 
encryption is enabled. Adaptive Query Execution fetch contiguous shuffle blocks 
for the same map task in batch to reduce IO and improve performance. However, 
we found that batch fetching is incompatible with io encryption.
    
    ### Why are the changes needed?
    Before this patch, we set `spark.io.encryption.enabled` to true, then run 
some queries which coalesced partitions by AEQ, may got following error message:
    ```14:05:52.638 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 
1.0 in stage 2.0 (TID 3) (11.240.37.88 executor driver): 
FetchFailed(BlockManagerId(driver, 11.240.37.88, 63574, None), shuffleId=0, 
mapIndex=0, mapId=0, reduceId=2, message=
    org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:772)
        at 
org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:845)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.DataInputStream.readInt(DataInputStream.java:387)
        at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
        at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:129)
        at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
        at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.IOException: Stream is corrupted
        at 
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:200)
        at 
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:226)
        at 
net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
        at 
org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:841)
        ... 25 more
    
    )
    ```
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New tests.
    
    Closes #31898 from hezuojiao/fetch_shuffle_in_batch.
    
    Authored-by: hezuojiao <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 39542bb81f8570219770bb6533c077f44f6cbd2a)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/shuffle/BlockStoreShuffleReader.scala    |  6 ++++--
 .../org/apache/spark/sql/internal/SQLConf.scala    |  4 ++--
 .../execution/CoalesceShufflePartitionsSuite.scala | 25 +++++++++++++++++++++-
 3 files changed, 30 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index bc2a0fb..30c7529 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -51,15 +51,17 @@ private[spark] class BlockStoreShuffleReader[K, C](
       true
     }
     val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)
+    // SPARK-34790: Fetching continuous blocks in batch is incompatible with 
io encryption.
+    val ioEncryption = conf.get(config.IO_ENCRYPTION_ENABLED)
 
     val doBatchFetch = shouldBatchFetch && serializerRelocatable &&
-      (!compressed || codecConcatenation) && !useOldFetchProtocol
+      (!compressed || codecConcatenation) && !useOldFetchProtocol && 
!ioEncryption
     if (shouldBatchFetch && !doBatchFetch) {
       logDebug("The feature tag of continuous shuffle block fetching is set to 
true, but " +
         "we can not enable the feature because other conditions are not 
satisfied. " +
         s"Shuffle compress: $compressed, serializer relocatable: 
$serializerRelocatable, " +
         s"codec concatenation: $codecConcatenation, use old shuffle fetch 
protocol: " +
-        s"$useOldFetchProtocol.")
+        s"$useOldFetchProtocol, io encryption: $ioEncryption.")
     }
     doBatchFetch
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 75bfbdb..800d237 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -491,8 +491,8 @@ object SQLConf {
         "reduce IO and improve performance. Note, multiple contiguous blocks 
exist in single " +
         s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' 
and " +
         s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature 
also depends " +
-        "on a relocatable serializer, the concatenation support codec in use 
and the new version " +
-        "shuffle fetch protocol.")
+        "on a relocatable serializer, the concatenation support codec in use, 
the new version " +
+        "shuffle fetch protocol and io encryption is disabled.")
       .version("3.0.0")
       .booleanConf
       .createWithDefault(true)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index 22c5b65..4cb8cf1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.adaptive._
@@ -57,15 +58,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
   def withSparkSession(
       f: SparkSession => Unit,
       targetPostShuffleInputSize: Int,
-      minNumPostShufflePartitions: Option[Int]): Unit = {
+      minNumPostShufflePartitions: Option[Int],
+      enableIOEncryption: Boolean = false): Unit = {
     val sparkConf =
       new SparkConf(false)
         .setMaster("local[*]")
         .setAppName("test")
         .set(UI_ENABLED, false)
+        .set(IO_ENCRYPTION_ENABLED, enableIOEncryption)
         .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
         .set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5")
         .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
+        .set(SQLConf.FETCH_SHUFFLE_BLOCKS_IN_BATCH.key, "true")
         .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
         .set(
           SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key,
@@ -408,6 +412,25 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
     }
     withSparkSession(test, 100, None)
   }
+
+  test("SPARK-34790: enable IO encryption in AQE partition coalescing") {
+    val test: SparkSession => Unit = { spark: SparkSession =>
+      val ds = spark.range(0, 100, 1, numInputPartitions)
+      val resultDf = ds.repartition(ds.col("id"))
+      resultDf.collect()
+
+      val finalPlan = resultDf.queryExecution.executedPlan
+        .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+      assert(
+        finalPlan.collect {
+          case r @ CoalescedShuffleReader() => r
+        }.isDefinedAt(0))
+    }
+    Seq(true, false).foreach { enableIOEncryption =>
+      // Before SPARK-34790, it will throw an exception when io encryption 
enabled.
+      withSparkSession(test, Int.MaxValue, None, enableIOEncryption)
+    }
+  }
 }
 
 object CoalescedShuffleReader {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to