Repository: spark
Updated Branches:
  refs/heads/master 10f45b3c8 -> a11db942a


[SPARK-21923][CORE] Avoid calling reserveUnrollMemoryForThisTask for every 
record

## What changes were proposed in this pull request?
When Spark persist data to Unsafe memory, we call  the method 
`MemoryStore.putIteratorAsBytes`, which need synchronize the `memoryManager` 
for every record write. This implementation is not necessary, we can apply for 
more memory at a time to reduce unnecessary synchronization.

## How was this patch tested?

Test case (with 1 executor 20 core):
```scala
val start = System.currentTimeMillis()
val data = sc.parallelize(0 until Integer.MAX_VALUE, 100)
      .persist(StorageLevel.OFF_HEAP)
      .count()

println(System.currentTimeMillis() - start)

```

Test result:

before

|  27647  |  29108  |  28591  |  28264  |  27232  |

after

|  26868  |  26358  |  27767  |  26653  |  26693  |

Author: Xianyang Liu <xianyang....@intel.com>

Closes #19135 from ConeyLiu/memorystore.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a11db942
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a11db942
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a11db942

Branch: refs/heads/master
Commit: a11db942aaf4c470a85f8a1b180f034f7a584254
Parents: 10f45b3
Author: Xianyang Liu <xianyang....@intel.com>
Authored: Tue Sep 19 14:51:27 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Sep 19 14:51:27 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala    | 15 +++++++++++++++
 .../apache/spark/storage/memory/MemoryStore.scala | 18 ++++++++++++++----
 2 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a11db942/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 0d3769a..e0f6960 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -385,4 +385,19 @@ package object config {
       .checkValue(v => v > 0 && v <= Int.MaxValue,
         s"The buffer size must be greater than 0 and less than 
${Int.MaxValue}.")
       .createWithDefault(1024 * 1024)
+
+  private[spark] val UNROLL_MEMORY_CHECK_PERIOD =
+    ConfigBuilder("spark.storage.unrollMemoryCheckPeriod")
+      .internal()
+      .doc("The memory check period is used to determine how often we should 
check whether "
+        + "there is a need to request more memory when we try to unroll the 
given block in memory.")
+      .longConf
+      .createWithDefault(16)
+
+  private[spark] val UNROLL_MEMORY_GROWTH_FACTOR =
+    ConfigBuilder("spark.storage.unrollMemoryGrowthFactor")
+      .internal()
+      .doc("Memory to request as a multiple of the size that used to unroll 
the block.")
+      .doubleConf
+      .createWithDefault(1.5)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a11db942/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 90e3af2..eb2201d 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -29,6 +29,7 @@ import com.google.common.io.ByteStreams
 
 import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, 
UNROLL_MEMORY_GROWTH_FACTOR}
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
 import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, 
StreamBlockId}
@@ -190,11 +191,11 @@ private[spark] class MemoryStore(
     // Initial per-task memory to request for unrolling blocks (bytes).
     val initialMemoryThreshold = unrollMemoryThreshold
     // How often to check whether we need to request more memory
-    val memoryCheckPeriod = 16
+    val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
     // Memory currently reserved by this task for this particular unrolling 
operation
     var memoryThreshold = initialMemoryThreshold
     // Memory to request as a multiple of current vector size
-    val memoryGrowthFactor = 1.5
+    val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
     // Keep track of unroll memory used by this particular block / 
putIterator() operation
     var unrollMemoryUsedByThisBlock = 0L
     // Underlying vector for unrolling the block
@@ -325,6 +326,12 @@ private[spark] class MemoryStore(
 
     // Whether there is still enough memory for us to continue unrolling this 
block
     var keepUnrolling = true
+    // Number of elements unrolled so far
+    var elementsUnrolled = 0L
+    // How often to check whether we need to request more memory
+    val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
+    // Memory to request as a multiple of current bbos size
+    val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
     // Initial per-task memory to request for unrolling blocks (bytes).
     val initialMemoryThreshold = unrollMemoryThreshold
     // Keep track of unroll memory used by this particular block / 
putIterator() operation
@@ -359,7 +366,7 @@ private[spark] class MemoryStore(
 
     def reserveAdditionalMemoryIfNecessary(): Unit = {
       if (bbos.size > unrollMemoryUsedByThisBlock) {
-        val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
+        val amountToRequest = (bbos.size * memoryGrowthFactor - 
unrollMemoryUsedByThisBlock).toLong
         keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
         if (keepUnrolling) {
           unrollMemoryUsedByThisBlock += amountToRequest
@@ -370,7 +377,10 @@ private[spark] class MemoryStore(
     // Unroll this block safely, checking whether we have exceeded our 
threshold
     while (values.hasNext && keepUnrolling) {
       serializationStream.writeObject(values.next())(classTag)
-      reserveAdditionalMemoryIfNecessary()
+      elementsUnrolled += 1
+      if (elementsUnrolled % memoryCheckPeriod == 0) {
+        reserveAdditionalMemoryIfNecessary()
+      }
     }
 
     // Make sure that we have enough memory to store the block. By this point, 
it is possible that


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to