Repository: spark
Updated Branches:
  refs/heads/branch-1.6 eddc7a58d -> 9a1ae6a1b


[SPARK-11583] [CORE] MapStatus Using RoaringBitmap More Properly

This PR upgrade the version of RoaringBitmap to 0.5.10, to optimize the memory 
layout, will be much smaller when most of blocks are empty.

This PR is based on #9661 (fix conflicts), see all of the comments at 
https://github.com/apache/spark/pull/9661 .

Author: Kent Yao <[email protected]>
Author: Davies Liu <[email protected]>
Author: Charles Allen <[email protected]>

Closes #9746 from davies/roaring_mapstatus.

(cherry picked from commit e33053ee0015025bbcfddb20cc9216c225bbe624)
Signed-off-by: Davies Liu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a1ae6a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a1ae6a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a1ae6a1

Branch: refs/heads/branch-1.6
Commit: 9a1ae6a1b98912f1afdcff60d45aec1a4e69587b
Parents: eddc7a5
Author: Kent Yao <[email protected]>
Authored: Tue Nov 17 19:44:29 2015 -0800
Committer: Davies Liu <[email protected]>
Committed: Tue Nov 17 19:45:35 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/MapStatus.scala  |  5 ++--
 .../spark/serializer/KryoSerializer.scala       |  6 ++--
 .../apache/spark/scheduler/MapStatusSuite.scala | 31 ++++++++++++++++++++
 pom.xml                                         |  2 +-
 4 files changed, 38 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9a1ae6a1/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 1efce12..b2e9a97 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -122,8 +122,7 @@ private[spark] class CompressedMapStatus(
 
 /**
  * A [[MapStatus]] implementation that only stores the average size of 
non-empty blocks,
- * plus a bitmap for tracking which blocks are empty.  During serialization, 
this bitmap
- * is compressed.
+ * plus a bitmap for tracking which blocks are empty.
  *
  * @param loc location where the task is being executed
  * @param numNonEmptyBlocks the number of non-empty blocks
@@ -194,6 +193,8 @@ private[spark] object HighlyCompressedMapStatus {
     } else {
       0
     }
+    emptyBlocks.trim()
+    emptyBlocks.runOptimize()
     new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9a1ae6a1/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 1bcb317..d5ba690 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.serializer
 
-import java.io.{EOFException, IOException, InputStream, OutputStream, 
DataInput, DataOutput}
+import java.io.{DataInput, DataOutput, EOFException, IOException, InputStream, 
OutputStream}
 import java.nio.ByteBuffer
 import javax.annotation.Nullable
 
@@ -25,9 +25,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
-import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => 
KryoClassSerializer}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => 
KryoJavaSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => 
KryoClassSerializer}
 import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
 import org.apache.avro.generic.{GenericData, GenericRecord}
 import org.roaringbitmap.RoaringBitmap
@@ -38,8 +38,8 @@ import org.apache.spark.broadcast.HttpBroadcast
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatus}
 import org.apache.spark.storage._
-import org.apache.spark.util.{Utils, BoundedPriorityQueue, 
SerializableConfiguration, SerializableJobConf}
 import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, 
SerializableJobConf, Utils}
 
 /**
  * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo 
serialization library]].

http://git-wip-us.apache.org/repos/asf/spark/blob/9a1ae6a1/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index b8e466f..15c8de6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.serializer.JavaSerializer
+import org.roaringbitmap.RoaringBitmap
 
 import scala.util.Random
 
@@ -97,4 +98,34 @@ class MapStatusSuite extends SparkFunSuite {
     val buf = ser.newInstance().serialize(status)
     ser.newInstance().deserialize[MapStatus](buf)
   }
+
+  test("RoaringBitmap: runOptimize succeeded") {
+    val r = new RoaringBitmap
+    (1 to 200000).foreach(i =>
+      if (i % 200 != 0) {
+        r.add(i)
+      }
+    )
+    val size1 = r.getSizeInBytes
+    val success = r.runOptimize()
+    r.trim()
+    val size2 = r.getSizeInBytes
+    assert(size1 > size2)
+    assert(success)
+  }
+
+  test("RoaringBitmap: runOptimize failed") {
+    val r = new RoaringBitmap
+    (1 to 200000).foreach(i =>
+      if (i % 200 == 0) {
+        r.add(i)
+      }
+    )
+    val size1 = r.getSizeInBytes
+    val success = r.runOptimize()
+    r.trim()
+    val size2 = r.getSizeInBytes
+    assert(size1 === size2)
+    assert(!success)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9a1ae6a1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2a8a445..940e2d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -637,7 +637,7 @@
       <dependency>
         <groupId>org.roaringbitmap</groupId>
         <artifactId>RoaringBitmap</artifactId>
-        <version>0.4.5</version>
+        <version>0.5.11</version>
       </dependency>
       <dependency>
         <groupId>commons-net</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to