Repository: spark
Updated Branches:
  refs/heads/master aa8bb117a -> 165e06a74


SPARK-1057 (alternative) Remove fastutil

(This is for discussion at this point -- I'm not suggesting this should be 
committed.)

This is what removing fastutil looks like. Much of it is straightforward, like 
using `java.io` buffered stream classes, and Guava for murmurhash3.

Uses of the `FastByteArrayOutputStream` were a little trickier. In only one 
case though do I think the change to use `java.io` actually entails an extra 
array copy.

The rest is using `OpenHashMap` and `OpenHashSet`.  These are now written in 
terms of more scala-like operations.

`OpenHashMap` is where I made three non-trivial changes to make it work, and 
they need review:

- It is no longer private
- The key must be a `ClassTag`
- Unless a lot of other code changes, the key type can't enforce being a 
supertype of `Null`

It all works and tests pass, and I think there is reason to believe it's OK 
from a speed perspective.

But what about those last changes?

Author: Sean Owen <so...@cloudera.com>

Closes #266 from srowen/SPARK-1057-alternate and squashes the following commits:

2601129 [Sean Owen] Fix Map return type error not previously caught
ec65502 [Sean Owen] Updates from matei's review
00bc81e [Sean Owen] Remove use of fastutil and replace with use of java.io, 
spark.util and Guava classes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/165e06a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/165e06a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/165e06a7

Branch: refs/heads/master
Commit: 165e06a74c3d75e6b7341c120943add8b035b96a
Parents: aa8bb11
Author: Sean Owen <so...@cloudera.com>
Authored: Fri Apr 11 22:46:47 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Fri Apr 11 22:46:47 2014 -0700

----------------------------------------------------------------------
 core/pom.xml                                    |  4 ---
 .../apache/spark/broadcast/HttpBroadcast.scala  |  9 +++---
 .../spark/partial/GroupedCountEvaluator.scala   | 32 ++++++++----------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 34 +++++++++-----------
 .../spark/scheduler/ReplayListenerBus.scala     |  5 ++-
 .../scala/org/apache/spark/scheduler/Task.scala |  9 ++----
 .../apache/spark/serializer/Serializer.scala    |  9 ++----
 .../org/apache/spark/storage/BlockManager.scala | 10 +++---
 .../spark/storage/BlockObjectWriter.scala       |  6 ++--
 .../org/apache/spark/util/FileLogger.scala      |  5 ++-
 .../org/apache/spark/util/SizeEstimator.scala   |  5 ++-
 .../spark/util/collection/AppendOnlyMap.scala   |  7 ++--
 .../util/collection/ExternalAppendOnlyMap.scala |  5 ++-
 .../spark/util/collection/OpenHashMap.scala     |  3 +-
 .../spark/util/collection/OpenHashSet.scala     |  4 +--
 pom.xml                                         |  5 ---
 project/SparkBuild.scala                        |  1 -
 .../spark/streaming/util/RawTextHelper.scala    | 15 +++++----
 .../spark/streaming/util/RawTextSender.scala    | 11 +++----
 19 files changed, 72 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 1f80838..a1bdd8e 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -158,10 +158,6 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>it.unimi.dsi</groupId>
-      <artifactId>fastutil</artifactId>
-    </dependency>
-    <dependency>
       <groupId>colt</groupId>
       <artifactId>colt</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index f6a8a8a..29372f1 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -18,11 +18,10 @@
 package org.apache.spark.broadcast
 
 import java.io.{File, FileOutputStream, ObjectInputStream, ObjectOutputStream, 
OutputStream}
-import java.net.{URI, URL, URLConnection}
+import java.io.{BufferedInputStream, BufferedOutputStream}
+import java.net.{URL, URLConnection, URI}
 import java.util.concurrent.TimeUnit
 
-import it.unimi.dsi.fastutil.io.{FastBufferedInputStream, 
FastBufferedOutputStream}
-
 import org.apache.spark.{HttpServer, Logging, SecurityManager, SparkConf, 
SparkEnv}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
@@ -164,7 +163,7 @@ private[spark] object HttpBroadcast extends Logging {
       if (compress) {
         compressionCodec.compressedOutputStream(new FileOutputStream(file))
       } else {
-        new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
+        new BufferedOutputStream(new FileOutputStream(file), bufferSize)
       }
     }
     val ser = SparkEnv.get.serializer.newInstance()
@@ -195,7 +194,7 @@ private[spark] object HttpBroadcast extends Logging {
       if (compress) {
         compressionCodec.compressedInputStream(inputStream)
       } else {
-        new FastBufferedInputStream(inputStream, bufferSize)
+        new BufferedInputStream(inputStream, bufferSize)
       }
     }
     val ser = SparkEnv.get.serializer.newInstance()

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala 
b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
index 40b70ba..8bb7812 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
@@ -22,36 +22,33 @@ import java.util.{HashMap => JHashMap}
 import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.Map
 import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
 
 import cern.jet.stat.Probability
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+import org.apache.spark.util.collection.OpenHashMap
 
 /**
  * An ApproximateEvaluator for counts by key. Returns a map of key to 
confidence interval.
  */
-private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: 
Double)
-  extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
+private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, 
confidence: Double)
+  extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] {
 
   var outputsMerged = 0
-  var sums = new OLMap[T]   // Sum of counts for each key
+  var sums = new OpenHashMap[T,Long]()   // Sum of counts for each key
 
-  override def merge(outputId: Int, taskResult: OLMap[T]) {
+  override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) {
     outputsMerged += 1
-    val iter = taskResult.object2LongEntrySet.fastIterator()
-    while (iter.hasNext) {
-      val entry = iter.next()
-      sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue)
+    taskResult.foreach { case (key, value) =>
+      sums.changeValue(key, value, _ + value)
     }
   }
 
   override def currentResult(): Map[T, BoundedDouble] = {
     if (outputsMerged == totalOutputs) {
       val result = new JHashMap[T, BoundedDouble](sums.size)
-      val iter = sums.object2LongEntrySet.fastIterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        val sum = entry.getLongValue()
-        result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
+      sums.foreach { case (key, sum) =>
+        result(key) = new BoundedDouble(sum, 1.0, sum, sum)
       }
       result
     } else if (outputsMerged == 0) {
@@ -60,16 +57,13 @@ private[spark] class GroupedCountEvaluator[T](totalOutputs: 
Int, confidence: Dou
       val p = outputsMerged.toDouble / totalOutputs
       val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
       val result = new JHashMap[T, BoundedDouble](sums.size)
-      val iter = sums.object2LongEntrySet.fastIterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        val sum = entry.getLongValue
+      sums.foreach { case (key, sum) =>
         val mean = (sum + 1 - p) / p
         val variance = (sum + 1) * (1 - p) / (p * p)
         val stdev = math.sqrt(variance)
         val low = mean - confFactor * stdev
         val high = mean + confFactor * stdev
-        result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
+        result(key) = new BoundedDouble(mean, confidence, low, high)
       }
       result
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3437b2c..891efcc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -20,12 +20,10 @@ package org.apache.spark.rdd
 import java.util.Random
 
 import scala.collection.Map
-import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.{classTag, ClassTag}
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
 import org.apache.hadoop.io.BytesWritable
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.io.NullWritable
@@ -43,6 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, 
Utils}
+import org.apache.spark.util.collection.OpenHashMap
 import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}
 
 /**
@@ -834,24 +833,24 @@ abstract class RDD[T: ClassTag](
       throw new SparkException("countByValue() does not support arrays")
     }
     // TODO: This should perhaps be distributed by default.
-    def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
-      val map = new OLMap[T]
-      while (iter.hasNext) {
-        val v = iter.next()
-        map.put(v, map.getLong(v) + 1L)
+    def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
+      val map = new OpenHashMap[T,Long]
+      iter.foreach {
+        t => map.changeValue(t, 1L, _ + 1L)
       }
       Iterator(map)
     }
-    def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
-      val iter = m2.object2LongEntrySet.fastIterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
+    def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): 
OpenHashMap[T,Long] = {
+      m2.foreach { case (key, value) =>
+        m1.changeValue(key, value, _ + value)
       }
       m1
     }
     val myResult = mapPartitions(countPartition).reduce(mergeMaps)
-    myResult.asInstanceOf[java.util.Map[T, Long]]   // Will be wrapped as a 
Scala mutable Map
+    // Convert to a Scala mutable map
+    val mutableResult = scala.collection.mutable.Map[T,Long]()
+    myResult.foreach { case (k, v) => mutableResult.put(k, v) }
+    mutableResult
   }
 
   /**
@@ -866,11 +865,10 @@ abstract class RDD[T: ClassTag](
     if (elementClassTag.runtimeClass.isArray) {
       throw new SparkException("countByValueApprox() does not support arrays")
     }
-    val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) 
=>
-      val map = new OLMap[T]
-      while (iter.hasNext) {
-        val v = iter.next()
-        map.put(v, map.getLong(v) + 1L)
+    val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { 
(ctx, iter) =>
+      val map = new OpenHashMap[T,Long]
+      iter.foreach {
+        t => map.changeValue(t, 1L, _ + 1L)
       }
       map
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index b03665f..f868e77 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -17,11 +17,10 @@
 
 package org.apache.spark.scheduler
 
-import java.io.InputStream
+import java.io.{BufferedInputStream, InputStream}
 
 import scala.io.Source
 
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
 import org.apache.hadoop.fs.{Path, FileSystem}
 import org.json4s.jackson.JsonMethods._
 
@@ -62,7 +61,7 @@ private[spark] class ReplayListenerBus(
       var currentLine = "<not started>"
       try {
         fileStream = Some(fileSystem.open(path))
-        bufferedStream = Some(new FastBufferedInputStream(fileStream.get))
+        bufferedStream = Some(new BufferedInputStream(fileStream.get))
         compressStream = Some(wrapForCompression(bufferedStream.get))
 
         // Parse each line as an event and post the event to all attached 
listeners

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index b85b4a5..a8bcb7d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.scheduler
 
-import java.io.{DataInputStream, DataOutputStream}
+import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream}
 import java.nio.ByteBuffer
 
 import scala.collection.mutable.HashMap
 
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
 import org.apache.spark.TaskContext
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.serializer.SerializerInstance
@@ -104,7 +102,7 @@ private[spark] object Task {
       serializer: SerializerInstance)
     : ByteBuffer = {
 
-    val out = new FastByteArrayOutputStream(4096)
+    val out = new ByteArrayOutputStream(4096)
     val dataOut = new DataOutputStream(out)
 
     // Write currentFiles
@@ -125,8 +123,7 @@ private[spark] object Task {
     dataOut.flush()
     val taskBytes = serializer.serialize(task).array()
     out.write(taskBytes)
-    out.trim()
-    ByteBuffer.wrap(out.array)
+    ByteBuffer.wrap(out.toByteArray)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 9f04dc6..f2c8f9b 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.serializer
 
-import java.io.{EOFException, InputStream, OutputStream}
+import java.io.{ByteArrayOutputStream, EOFException, InputStream, OutputStream}
 import java.nio.ByteBuffer
 
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
 import org.apache.spark.SparkEnv
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
@@ -73,10 +71,9 @@ trait SerializerInstance {
 
   def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
     // Default implementation uses serializeStream
-    val stream = new FastByteArrayOutputStream()
+    val stream = new ByteArrayOutputStream()
     serializeStream(stream).writeAll(iterator)
-    val buffer = ByteBuffer.allocate(stream.position.toInt)
-    buffer.put(stream.array, 0, stream.position.toInt)
+    val buffer = ByteBuffer.wrap(stream.toByteArray)
     buffer.flip()
     buffer
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index df9bb40..f140170 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.{File, InputStream, OutputStream}
+import java.io.{File, InputStream, OutputStream, BufferedOutputStream, 
ByteArrayOutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -26,7 +26,6 @@ import scala.concurrent.duration._
 import scala.util.Random
 
 import akka.actor.{ActorSystem, Cancellable, Props}
-import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, 
FastByteArrayOutputStream}
 import sun.nio.ch.DirectBuffer
 
 import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, 
SparkConf, SparkEnv, SparkException}
@@ -992,7 +991,7 @@ private[spark] class BlockManager(
       outputStream: OutputStream,
       values: Iterator[Any],
       serializer: Serializer = defaultSerializer) {
-    val byteStream = new FastBufferedOutputStream(outputStream)
+    val byteStream = new BufferedOutputStream(outputStream)
     val ser = serializer.newInstance()
     ser.serializeStream(wrapForCompression(blockId, 
byteStream)).writeAll(values).close()
   }
@@ -1002,10 +1001,9 @@ private[spark] class BlockManager(
       blockId: BlockId,
       values: Iterator[Any],
       serializer: Serializer = defaultSerializer): ByteBuffer = {
-    val byteStream = new FastByteArrayOutputStream(4096)
+    val byteStream = new ByteArrayOutputStream(4096)
     dataSerializeStream(blockId, byteStream, values, serializer)
-    byteStream.trim()
-    ByteBuffer.wrap(byteStream.array)
+    ByteBuffer.wrap(byteStream.toByteArray)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 696b930..a2687e6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.storage
 
-import java.io.{FileOutputStream, File, OutputStream}
+import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream}
 import java.nio.channels.FileChannel
 
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-
 import org.apache.spark.Logging
 import org.apache.spark.serializer.{SerializationStream, Serializer}
 
@@ -119,7 +117,7 @@ private[spark] class DiskBlockObjectWriter(
     ts = new TimeTrackingOutputStream(fos)
     channel = fos.getChannel()
     lastValidPosition = initialPosition
-    bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
+    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
     objOut = serializer.newInstance().serializeStream(bs)
     initialized = true
     this

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/util/FileLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala 
b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 0080a8b..68a12e8 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.util
 
-import java.io._
+import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, 
IOException}
 import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.Date
 
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
 import org.apache.hadoop.fs.{FSDataOutputStream, Path}
 
 import org.apache.spark.{Logging, SparkConf}
@@ -100,7 +99,7 @@ private[spark] class FileLogger(
         hadoopDataStream.get
     }
 
-    val bstream = new FastBufferedOutputStream(dstream, outputBufferSize)
+    val bstream = new BufferedOutputStream(dstream, outputBufferSize)
     val cstream = if (compress) 
compressionCodec.compressedOutputStream(bstream) else bstream
     new PrintWriter(cstream)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala 
b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index b955612..0846557 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -27,9 +27,8 @@ import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable.ArrayBuffer
 
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet
-
 import org.apache.spark.Logging
+import org.apache.spark.util.collection.OpenHashSet
 
 /**
  * Estimates the sizes of Java objects (number of bytes of memory they 
occupy), for use in
@@ -207,7 +206,7 @@ private[spark] object SizeEstimator extends Logging {
         // Estimate the size of a large array by sampling elements without 
replacement.
         var size = 0.0
         val rand = new Random(42)
-        val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE)
+        val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
         for (i <- 0 until ARRAY_SAMPLE_SIZE) {
           var index = 0
           do {

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala 
b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 025492b..ad38250 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -19,6 +19,8 @@ package org.apache.spark.util.collection
 
 import java.util.{Arrays, Comparator}
 
+import com.google.common.hash.Hashing
+
 import org.apache.spark.annotation.DeveloperApi
 
 /**
@@ -199,11 +201,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
 
   /**
    * Re-hash a value to deal better with hash functions that don't differ in 
the lower bits.
-   * We use the Murmur Hash 3 finalization step that's also used in fastutil.
    */
-  private def rehash(h: Int): Int = {
-    it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
-  }
+  private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
 
   /** Double the table's size and re-hash everything */
   protected def growTable() {

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index dd01ae8..d615767 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.util.collection
 
-import java.io._
+import java.io.{InputStream, BufferedInputStream, FileInputStream, File, 
Serializable, EOFException}
 import java.util.Comparator
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.io.ByteStreams
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
 
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.annotation.DeveloperApi
@@ -350,7 +349,7 @@ class ExternalAppendOnlyMap[K, V, C](
   private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: 
ArrayBuffer[Long])
     extends Iterator[(K, C)] {
     private val fileStream = new FileInputStream(file)
-    private val bufferedStream = new FastBufferedInputStream(fileStream, 
fileBufferSize)
+    private val bufferedStream = new BufferedInputStream(fileStream, 
fileBufferSize)
 
     // An intermediate stream that reads from exactly one batch
     // This guards against pre-fetching and other arbitrary behavior of higher 
level streams

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala 
b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index 62f99f3..b8de4ff 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -30,7 +30,8 @@ import org.apache.spark.annotation.DeveloperApi
  * Under the hood, it uses our OpenHashSet implementation.
  */
 @DeveloperApi
-class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: 
ClassTag](
+private[spark]
+class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
     initialCapacity: Int)
   extends Iterable[(K, V)]
   with Serializable {

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala 
b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 148c12e..19af4f8 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util.collection
 
 import scala.reflect._
+import com.google.common.hash.Hashing
 
 /**
  * A simple, fast hash set optimized for non-null insertion-only use case, 
where keys are never
@@ -256,9 +257,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
 
   /**
    * Re-hash a value to deal better with hash functions that don't differ in 
the lower bits.
-   * We use the Murmur Hash 3 finalization step that's also used in fastutil.
    */
-  private def hashcode(h: Int): Int = 
it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
+  private def hashcode(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
 
   private def nextPowerOf2(n: Int): Int = {
     val highBit = Integer.highestOneBit(n)

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c03bb35..5f66cbe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -349,11 +349,6 @@
         </exclusions>
       </dependency>
       <dependency>
-        <groupId>it.unimi.dsi</groupId>
-        <artifactId>fastutil</artifactId>
-        <version>6.4.4</version>
-      </dependency>
-      <dependency>
         <groupId>colt</groupId>
         <artifactId>colt</artifactId>
         <version>1.2.0</version>

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 2116376..a6058bb 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -331,7 +331,6 @@ object SparkBuild extends Build {
         "org.spark-project.akka"    %% "akka-slf4j"       % akkaVersion 
excludeAll(excludeNetty),
         "org.spark-project.akka"    %% "akka-testkit"     % akkaVersion % 
"test",
         "org.json4s"                %% "json4s-jackson"   % "3.2.6" 
excludeAll(excludeScalap),
-        "it.unimi.dsi"               % "fastutil"         % "6.4.4",
         "colt"                       % "colt"             % "1.2.0",
         "org.apache.mesos"           % "mesos"            % "0.13.0",
         "commons-net"                % "commons-net"      % "2.2",

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index bd1df55..bbf57ef 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -19,18 +19,17 @@ package org.apache.spark.streaming.util
 
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import org.apache.spark.util.collection.OpenHashMap
 import scala.collection.JavaConversions.mapAsScalaMap
 
 private[streaming]
 object RawTextHelper {
 
-  /**
-   * Splits lines and counts the words in them using specialized 
object-to-long hashmap
-   * (to avoid boxing-unboxing overhead of Long in java/scala HashMap)
+  /** 
+   * Splits lines and counts the words.
    */
   def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, 
Long)] = {
-    val map = new OLMap[String]
+    val map = new OpenHashMap[String,Long]
     var i = 0
     var j = 0
     while (iter.hasNext) {
@@ -43,14 +42,16 @@ object RawTextHelper {
         }
         if (j > i) {
           val w = s.substring(i, j)
-          val c = map.getLong(w)
-          map.put(w, c + 1)
+          map.changeValue(w, 1L, _ + 1L)
         }
         i = j
         while (i < s.length && s.charAt(i) == ' ') {
           i += 1
         }
       }
+      map.toIterator.map {
+        case (k, v) => (k, v)
+      }
     }
     map.toIterator.map{case (k, v) => (k, v)}
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/165e06a7/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index 684b38e..a785081 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.streaming.util
 
-import java.io.IOException
+import java.io.{ByteArrayOutputStream, IOException}
 import java.net.ServerSocket
 import java.nio.ByteBuffer
 
 import scala.io.Source
 
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.IntParam
@@ -45,16 +43,15 @@ object RawTextSender extends Logging {
 
     // Repeat the input data multiple times to fill in a buffer
     val lines = Source.fromFile(file).getLines().toArray
-    val bufferStream = new FastByteArrayOutputStream(blockSize + 1000)
+    val bufferStream = new ByteArrayOutputStream(blockSize + 1000)
     val ser = new KryoSerializer(new SparkConf()).newInstance()
     val serStream = ser.serializeStream(bufferStream)
     var i = 0
-    while (bufferStream.position < blockSize) {
+    while (bufferStream.size < blockSize) {
       serStream.writeObject(lines(i))
       i = (i + 1) % lines.length
     }
-    bufferStream.trim()
-    val array = bufferStream.array
+    val array = bufferStream.toByteArray
 
     val countBuf = ByteBuffer.wrap(new Array[Byte](4))
     countBuf.putInt(array.length)

Reply via email to