Repository: spark
Updated Branches:
  refs/heads/master bf25f9bdf -> e33053ee0


[SPARK-11583] [CORE] MapStatus Using RoaringBitmap More Properly

This PR upgrade the version of RoaringBitmap to 0.5.10, to optimize the memory 
layout, will be much smaller when most of blocks are empty.

This PR is based on #9661 (fix conflicts), see all of the comments at 
https://github.com/apache/spark/pull/9661 .

Author: Kent Yao <[email protected]>
Author: Davies Liu <[email protected]>
Author: Charles Allen <[email protected]>

Closes #9746 from davies/roaring_mapstatus.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e33053ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e33053ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e33053ee

Branch: refs/heads/master
Commit: e33053ee0015025bbcfddb20cc9216c225bbe624
Parents: bf25f9b
Author: Kent Yao <[email protected]>
Authored: Tue Nov 17 19:44:29 2015 -0800
Committer: Davies Liu <[email protected]>
Committed: Tue Nov 17 19:44:29 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/MapStatus.scala  |  5 ++--
 .../spark/serializer/KryoSerializer.scala       |  6 ++--
 .../apache/spark/scheduler/MapStatusSuite.scala | 31 ++++++++++++++++++++
 pom.xml                                         |  2 +-
 4 files changed, 38 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e33053ee/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 1efce12..b2e9a97 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -122,8 +122,7 @@ private[spark] class CompressedMapStatus(
 
 /**
  * A [[MapStatus]] implementation that only stores the average size of 
non-empty blocks,
- * plus a bitmap for tracking which blocks are empty.  During serialization, 
this bitmap
- * is compressed.
+ * plus a bitmap for tracking which blocks are empty.
  *
  * @param loc location where the task is being executed
  * @param numNonEmptyBlocks the number of non-empty blocks
@@ -194,6 +193,8 @@ private[spark] object HighlyCompressedMapStatus {
     } else {
       0
     }
+    emptyBlocks.trim()
+    emptyBlocks.runOptimize()
     new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e33053ee/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 1bcb317..d5ba690 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.serializer
 
-import java.io.{EOFException, IOException, InputStream, OutputStream, 
DataInput, DataOutput}
+import java.io.{DataInput, DataOutput, EOFException, IOException, InputStream, 
OutputStream}
 import java.nio.ByteBuffer
 import javax.annotation.Nullable
 
@@ -25,9 +25,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
-import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => 
KryoClassSerializer}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => 
KryoJavaSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => 
KryoClassSerializer}
 import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
 import org.apache.avro.generic.{GenericData, GenericRecord}
 import org.roaringbitmap.RoaringBitmap
@@ -38,8 +38,8 @@ import org.apache.spark.broadcast.HttpBroadcast
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus}
 import org.apache.spark.storage._
-import org.apache.spark.util.{Utils, BoundedPriorityQueue, 
SerializableConfiguration, SerializableJobConf}
 import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, 
SerializableJobConf, Utils}
 
 /**
  * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo 
serialization library]].

http://git-wip-us.apache.org/repos/asf/spark/blob/e33053ee/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index b8e466f..15c8de6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.serializer.JavaSerializer
+import org.roaringbitmap.RoaringBitmap
 
 import scala.util.Random
 
@@ -97,4 +98,34 @@ class MapStatusSuite extends SparkFunSuite {
     val buf = ser.newInstance().serialize(status)
     ser.newInstance().deserialize[MapStatus](buf)
   }
+
+  test("RoaringBitmap: runOptimize succeeded") {
+    val r = new RoaringBitmap
+    (1 to 200000).foreach(i =>
+      if (i % 200 != 0) {
+        r.add(i)
+      }
+    )
+    val size1 = r.getSizeInBytes
+    val success = r.runOptimize()
+    r.trim()
+    val size2 = r.getSizeInBytes
+    assert(size1 > size2)
+    assert(success)
+  }
+
+  test("RoaringBitmap: runOptimize failed") {
+    val r = new RoaringBitmap
+    (1 to 200000).foreach(i =>
+      if (i % 200 == 0) {
+        r.add(i)
+      }
+    )
+    val size1 = r.getSizeInBytes
+    val success = r.runOptimize()
+    r.trim()
+    val size2 = r.getSizeInBytes
+    assert(size1 === size2)
+    assert(!success)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e33053ee/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2a8a445..940e2d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -637,7 +637,7 @@
       <dependency>
         <groupId>org.roaringbitmap</groupId>
         <artifactId>RoaringBitmap</artifactId>
-        <version>0.4.5</version>
+        <version>0.5.11</version>
       </dependency>
       <dependency>
         <groupId>commons-net</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to