Repository: spark
Updated Branches:
  refs/heads/master ee29ef380 -> 7136719b7


[SPARK-2759][CORE] Generic Binary File Support in Spark

The additions add the abstract BinaryFileInputFormat and BinaryRecordReader 
classes for reading in data as a byte stream and converting it to another 
format using the ```def parseByteArray(inArray: Array[Byte]): T``` function.
As a trivial example ```ByteInputFormat``` and ```ByteRecordReader``` are 
included which just return the Array[Byte] from a given file.
Finally a RDD for ```BinaryFileInputFormat``` (to allow for easier partitioning 
changes as was done for WholeFileInput) was added and the appropriate byteFiles 
to the ```SparkContext``` so the functions can be easily used by others.
A common use case might be to read in a folder
```
sc.byteFiles("s3://mydrive/tif/*.tif").map(rawData => 
ReadTiffFromByteArray(rawData))
```

Author: Kevin Mader <kevinma...@gmail.com>
Author: Kevin Mader <kma...@users.noreply.github.com>

Closes #1658 from kmader/master and squashes the following commits:

3c49a30 [Kevin Mader] fixing wholetextfileinput to it has the same 
setMinPartitions function as in BinaryData files
359a096 [Kevin Mader] making the final corrections suggested by @mateiz and 
renaming a few functions to make their usage clearer
6379be4 [Kevin Mader] reorganizing code
7b9d181 [Kevin Mader] removing developer API, cleaning up imports
8ac288b [Kevin Mader] fixed a single slightly over 100 character line
92bda0d [Kevin Mader] added new tests, renamed files, fixed several of the 
javaapi functions, formatted code more nicely
a32fef7 [Kevin Mader] removed unneeded classes added DeveloperApi note to 
portabledatastreams since the implementation might change
49174d9 [Kevin Mader] removed unneeded classes added DeveloperApi note to 
portabledatastreams since the implementation might change
c27a8f1 [Kevin Mader] jenkins crashed before running anything last time, so 
making minor change
b348ce1 [Kevin Mader] fixed order in check (prefix only appears on jenkins not 
when I run unit tests locally)
0588737 [Kevin Mader] filename check in "binary file input as byte array" test 
now ignores prefixes and suffixes which might get added by Hadoop
4163e38 [Kevin Mader] fixing line length and output from FSDataInputStream to 
DataInputStream to minimize sensitivity to Hadoop API changes
19812a8 [Kevin Mader] Fixed the serialization issue with PortableDataStream 
since neither CombineFileSplit nor TaskAttemptContext implement the 
Serializable interface, by using ByteArrays for storing both and then 
recreating the objects from these bytearrays as needed.
238c83c [Kevin Mader] fixed several scala-style issues, changed structure of 
binaryFiles, removed excessive classes added new tests. The caching tests still 
have a serialization issue, but that should be easily fixed as well.
932a206 [Kevin Mader] Update RawFileInput.scala
a01c9cf [Kevin Mader] Update RawFileInput.scala
441f79a [Kevin Mader] fixed a few small comments and dependency
12e7be1 [Kevin Mader] removing imglib from maven (definitely not ready yet)
5deb79e [Kevin Mader] added new portabledatastream to code so that it can be 
serialized correctly
f032bc0 [Kevin Mader] fixed bug in path name, renamed tests
bc5c0b9 [Kevin Mader] made minor stylistic adjustments from mateiz
df8e528 [Kevin Mader] fixed line lengths and changed java test
9a313d5 [Kevin Mader] making classes that needn't be public private, adding 
automatic file closure, adding new tests
edf5829 [Kevin Mader] fixing line lengths, adding new lines
f4841dc [Kevin Mader] un-optimizing imports, silly intellij
eacfaa6 [Kevin Mader] Added FixedLengthBinaryInputFormat and RecordReader from 
freeman-lab and added them to both the JavaSparkContext and the SparkContext as 
fixedLengthBinaryFile
1622935 [Kevin Mader] changing the line lengths to make jenkins happy
1cfa38a [Kevin Mader] added apache headers, added datainputstream directly as 
an output option for more complicated readers (HDF5 perhaps), and renamed 
several of the functions and files to be more consistent. Also added parallel 
functions to the java api
84035f1 [Kevin Mader] adding binary and byte file support spark
81c5f12 [Kevin Mader] Merge pull request #1 from apache/master


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7136719b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7136719b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7136719b

Branch: refs/heads/master
Commit: 7136719b7d53ee1360abaa5e178ba9f8b00f3da8
Parents: ee29ef3
Author: Kevin Mader <kevinma...@gmail.com>
Authored: Sat Nov 1 11:22:46 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sat Nov 1 11:59:39 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  65 +++++-
 .../spark/api/java/JavaSparkContext.scala       |  82 ++++++-
 .../input/FixedLengthBinaryInputFormat.scala    |  85 ++++++++
 .../input/FixedLengthBinaryRecordReader.scala   | 126 +++++++++++
 .../apache/spark/input/PortableDataStream.scala | 218 +++++++++++++++++++
 .../spark/input/WholeTextFileInputFormat.scala  |   5 +-
 .../org/apache/spark/rdd/BinaryFileRDD.scala    |  51 +++++
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |   2 +-
 .../java/org/apache/spark/JavaAPISuite.java     |  79 +++++++
 .../test/scala/org/apache/spark/FileSuite.scala | 184 ++++++++++++++++
 10 files changed, 892 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7136719b/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6bfcd8c..8b4db78 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -41,7 +41,7 @@ import akka.actor.Props
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
-import org.apache.spark.input.WholeTextFileInputFormat
+import org.apache.spark.input.{StreamInputFormat, PortableDataStream, 
WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
@@ -533,6 +533,69 @@ class SparkContext(config: SparkConf) extends 
SparkStatusAPI with Logging {
       minPartitions).setName(path)
   }
 
+
+  /**
+   * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each 
file
+   * (useful for binary data)
+   *
+   * For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-00000
+   *   hdfs://a-hdfs-path/part-00001
+   *   ...
+   *   hdfs://a-hdfs-path/part-nnnnn
+   * }}}
+   *
+   * Do
+   * `val rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
+   *
+   * then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-00000, its content)
+   *   (a-hdfs-path/part-00001, its content)
+   *   ...
+   *   (a-hdfs-path/part-nnnnn, its content)
+   * }}}
+   *
+   * @param minPartitions A suggestion value of the minimal splitting number 
for input data.
+   *
+   * @note Small files are preferred; very large files may cause bad 
performance.
+   */
+  @Experimental
+  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+      RDD[(String, PortableDataStream)] = {
+    val job = new NewHadoopJob(hadoopConfiguration)
+    NewFileInputFormat.addInputPath(job, new Path(path))
+    val updateConf = job.getConfiguration
+    new BinaryFileRDD(
+      this,
+      classOf[StreamInputFormat],
+      classOf[String],
+      classOf[PortableDataStream],
+      updateConf,
+      minPartitions).setName(path)
+  }
+
+  /**
+   * Load data from a flat binary file, assuming the length of each record is 
constant.
+   *
+   * @param path Directory to the input data files
+   * @param recordLength The length at which to split the records
+   * @return An RDD of data with values, represented as byte arrays
+   */
+  @Experimental
+  def binaryRecords(path: String, recordLength: Int, conf: Configuration = 
hadoopConfiguration)
+      : RDD[Array[Byte]] = {
+    conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, 
recordLength)
+    val br = newAPIHadoopFile[LongWritable, BytesWritable, 
FixedLengthBinaryInputFormat](path,
+      classOf[FixedLengthBinaryInputFormat],
+      classOf[LongWritable],
+      classOf[BytesWritable],
+      conf=conf)
+    val data = br.map{ case (k, v) => v.getBytes}
+    data
+  }
+
   /**
    * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its 
InputFormat and other
    * necessary info (e.g. file name for a filesystem-based dataset, table name 
for HyperTable),

http://git-wip-us.apache.org/repos/asf/spark/blob/7136719b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 0565adf..e3aeba7 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -21,6 +21,11 @@ import java.io.Closeable
 import java.util
 import java.util.{Map => JMap}
 
+import java.io.DataInputStream
+
+import org.apache.hadoop.io.{BytesWritable, LongWritable}
+import org.apache.spark.input.{PortableDataStream, 
FixedLengthBinaryInputFormat}
+
 import scala.collection.JavaConversions
 import scala.collection.JavaConversions._
 import scala.language.implicitConversions
@@ -32,7 +37,8 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
 import org.apache.spark._
-import org.apache.spark.SparkContext.{DoubleAccumulatorParam, 
IntAccumulatorParam}
+import org.apache.spark.SparkContext._
+import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
@@ -202,6 +208,8 @@ class JavaSparkContext(val sc: SparkContext)
   def textFile(path: String, minPartitions: Int): JavaRDD[String] =
     sc.textFile(path, minPartitions)
 
+
+
   /**
    * Read a directory of text files from HDFS, a local file system (available 
on all nodes), or any
    * Hadoop-supported file system URI. Each file is read as a single record 
and returned in a
@@ -245,6 +253,78 @@ class JavaSparkContext(val sc: SparkContext)
   def wholeTextFiles(path: String): JavaPairRDD[String, String] =
     new JavaPairRDD(sc.wholeTextFiles(path))
 
+  /**
+   * Read a directory of binary files from HDFS, a local file system 
(available on all nodes),
+   * or any Hadoop-supported file system URI as a byte array. Each file is 
read as a single
+   * record and returned in a key-value pair, where the key is the path of 
each file,
+   * the value is the content of each file.
+   *
+   * For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-00000
+   *   hdfs://a-hdfs-path/part-00001
+   *   ...
+   *   hdfs://a-hdfs-path/part-nnnnn
+   * }}}
+   *
+   * Do
+   * `JavaPairRDD<String, byte[]> rdd = 
sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
+   *
+   * then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-00000, its content)
+   *   (a-hdfs-path/part-00001, its content)
+   *   ...
+   *   (a-hdfs-path/part-nnnnn, its content)
+   * }}}
+   *
+   * @note Small files are preferred; very large files but may cause bad 
performance.
+   *
+   * @param minPartitions A suggestion value of the minimal splitting number 
for input data.
+   */
+  def binaryFiles(path: String, minPartitions: Int): JavaPairRDD[String, 
PortableDataStream] =
+    new JavaPairRDD(sc.binaryFiles(path, minPartitions))
+
+  /**
+   * Read a directory of binary files from HDFS, a local file system 
(available on all nodes),
+   * or any Hadoop-supported file system URI as a byte array. Each file is 
read as a single
+   * record and returned in a key-value pair, where the key is the path of 
each file,
+   * the value is the content of each file.
+   *
+   * For example, if you have the following files:
+   * {{{
+   *   hdfs://a-hdfs-path/part-00000
+   *   hdfs://a-hdfs-path/part-00001
+   *   ...
+   *   hdfs://a-hdfs-path/part-nnnnn
+   * }}}
+   *
+   * Do
+   * `JavaPairRDD<String, byte[]> rdd = 
sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
+   *
+   * then `rdd` contains
+   * {{{
+   *   (a-hdfs-path/part-00000, its content)
+   *   (a-hdfs-path/part-00001, its content)
+   *   ...
+   *   (a-hdfs-path/part-nnnnn, its content)
+   * }}}
+   *
+   * @note Small files are preferred; very large files but may cause bad 
performance.
+   */
+  def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
+    new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))
+
+  /**
+   * Load data from a flat binary file, assuming the length of each record is 
constant.
+   *
+   * @param path Directory to the input data files
+   * @return An RDD of data with values, represented as byte arrays
+   */
+  def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
+    new JavaRDD(sc.binaryRecords(path, recordLength))
+  }
+
   /** Get an RDD for a Hadoop SequenceFile with given key and value types.
     *
     * '''Note:''' Because Hadoop's RecordReader class re-uses the same 
Writable object for each

http://git-wip-us.apache.org/repos/asf/spark/blob/7136719b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala 
b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
new file mode 100644
index 0000000..89b29af
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{BytesWritable, LongWritable}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, 
TaskAttemptContext}
+
+/**
+ * Custom Input Format for reading and splitting flat binary files that 
contain records,
+ * each of which are a fixed size in bytes. The fixed record size is specified 
through
+ * a parameter recordLength in the Hadoop configuration.
+ */
+private[spark] object FixedLengthBinaryInputFormat {
+  /** Property name to set in Hadoop JobConfs for record length */
+  val RECORD_LENGTH_PROPERTY = 
"org.apache.spark.input.FixedLengthBinaryInputFormat.recordLength"
+
+  /** Retrieves the record length property from a Hadoop configuration */
+  def getRecordLength(context: JobContext): Int = {
+    context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
+  }
+}
+
+private[spark] class FixedLengthBinaryInputFormat
+  extends FileInputFormat[LongWritable, BytesWritable] {
+
+  private var recordLength = -1
+
+  /**
+   * Override of isSplitable to ensure initial computation of the record length
+   */
+  override def isSplitable(context: JobContext, filename: Path): Boolean = {
+    if (recordLength == -1) {
+      recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
+    }
+    if (recordLength <= 0) {
+      println("record length is less than 0, file cannot be split")
+      false
+    } else {
+      true
+    }
+  }
+
+  /**
+   * This input format overrides computeSplitSize() to make sure that each 
split
+   * only contains full records. Each InputSplit passed to 
FixedLengthBinaryRecordReader
+   * will start at the first byte of a record, and the last byte will the last 
byte of a record.
+   */
+  override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: 
Long): Long = {
+    val defaultSize = super.computeSplitSize(blockSize, minSize, maxSize)
+    // If the default size is less than the length of a record, make it equal 
to it
+    // Otherwise, make sure the split size is as close to possible as the 
default size,
+    // but still contains a complete set of records, with the first record
+    // starting at the first byte in the split and the last record ending with 
the last byte
+    if (defaultSize < recordLength) {
+      recordLength.toLong
+    } else {
+      (Math.floor(defaultSize / recordLength) * recordLength).toLong
+    }
+  }
+
+  /**
+   * Create a FixedLengthBinaryRecordReader
+   */
+  override def createRecordReader(split: InputSplit, context: 
TaskAttemptContext)
+      : RecordReader[LongWritable, BytesWritable] = {
+    new FixedLengthBinaryRecordReader
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7136719b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
 
b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
new file mode 100644
index 0000000..5164a74
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.FSDataInputStream
+import org.apache.hadoop.io.compress.CompressionCodecFactory
+import org.apache.hadoop.io.{BytesWritable, LongWritable}
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+
+/**
+ * FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
+ * It uses the record length set in FixedLengthBinaryInputFormat to
+ * read one record at a time from the given InputSplit.
+ *
+ * Each call to nextKeyValue() updates the LongWritable key and BytesWritable 
value.
+ *
+ * key = record index (Long)
+ * value = the record itself (BytesWritable)
+ */
+private[spark] class FixedLengthBinaryRecordReader
+  extends RecordReader[LongWritable, BytesWritable] {
+
+  private var splitStart: Long = 0L
+  private var splitEnd: Long = 0L
+  private var currentPosition: Long = 0L
+  private var recordLength: Int = 0
+  private var fileInputStream: FSDataInputStream = null
+  private var recordKey: LongWritable = null
+  private var recordValue: BytesWritable = null
+
+  override def close() {
+    if (fileInputStream != null) {
+      fileInputStream.close()
+    }
+  }
+
+  override def getCurrentKey: LongWritable = {
+    recordKey
+  }
+
+  override def getCurrentValue: BytesWritable = {
+    recordValue
+  }
+
+  override def getProgress: Float = {
+    splitStart match {
+      case x if x == splitEnd => 0.0.toFloat
+      case _ => Math.min(
+        ((currentPosition - splitStart) / (splitEnd - splitStart)).toFloat, 1.0
+      ).toFloat
+    }
+  }
+
+  override def initialize(inputSplit: InputSplit, context: TaskAttemptContext) 
{
+    // the file input
+    val fileSplit = inputSplit.asInstanceOf[FileSplit]
+
+    // the byte position this fileSplit starts at
+    splitStart = fileSplit.getStart
+
+    // splitEnd byte marker that the fileSplit ends at
+    splitEnd = splitStart + fileSplit.getLength
+
+    // the actual file we will be reading from
+    val file = fileSplit.getPath
+    // job configuration
+    val job = context.getConfiguration
+    // check compression
+    val codec = new CompressionCodecFactory(job).getCodec(file)
+    if (codec != null) {
+      throw new IOException("FixedLengthRecordReader does not support reading 
compressed files")
+    }
+    // get the record length
+    recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
+    // get the filesystem
+    val fs = file.getFileSystem(job)
+    // open the File
+    fileInputStream = fs.open(file)
+    // seek to the splitStart position
+    fileInputStream.seek(splitStart)
+    // set our current position
+    currentPosition = splitStart
+  }
+
+  override def nextKeyValue(): Boolean = {
+    if (recordKey == null) {
+      recordKey = new LongWritable()
+    }
+    // the key is a linear index of the record, given by the
+    // position the record starts divided by the record length
+    recordKey.set(currentPosition / recordLength)
+    // the recordValue to place the bytes into
+    if (recordValue == null) {
+      recordValue = new BytesWritable(new Array[Byte](recordLength))
+    }
+    // read a record if the currentPosition is less than the split end
+    if (currentPosition < splitEnd) {
+      // setup a buffer to store the record
+      val buffer = recordValue.getBytes
+      fileInputStream.read(buffer, 0, recordLength)
+      // update our current position
+      currentPosition = currentPosition + recordLength
+      // return true
+      return true
+    }
+    false
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7136719b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala 
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
new file mode 100644
index 0000000..4574725
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.JavaConversions._
+
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, 
CombineFileRecordReader, CombineFileSplit}
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * A general format for reading whole files in as streams, byte arrays,
+ * or other functions to be added
+ */
+private[spark] abstract class StreamFileInputFormat[T]
+  extends CombineFileInputFormat[String, T]
+{
+  override protected def isSplitable(context: JobContext, file: Path): Boolean 
= false
+
+  /**
+   * Allow minPartitions set by end-user in order to keep compatibility with 
old Hadoop API
+   * which is set through setMaxSplitSize
+   */
+  def setMinPartitions(context: JobContext, minPartitions: Int) {
+    val files = listStatus(context)
+    val totalLen = files.map { file =>
+      if (file.isDir) 0L else file.getLen
+    }.sum
+
+    val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong
+    super.setMaxSplitSize(maxSplitSize)
+  }
+
+  def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): 
RecordReader[String, T]
+
+}
+
+/**
+ * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader 
RecordReader]]
+ * to reading files out as streams
+ */
+private[spark] abstract class StreamBasedRecordReader[T](
+    split: CombineFileSplit,
+    context: TaskAttemptContext,
+    index: Integer)
+  extends RecordReader[String, T] {
+
+  // True means the current file has been processed, then skip it.
+  private var processed = false
+
+  private var key = ""
+  private var value: T = null.asInstanceOf[T]
+
+  override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
+  override def close() = {}
+
+  override def getProgress = if (processed) 1.0f else 0.0f
+
+  override def getCurrentKey = key
+
+  override def getCurrentValue = value
+
+  override def nextKeyValue = {
+    if (!processed) {
+      val fileIn = new PortableDataStream(split, context, index)
+      value = parseStream(fileIn)
+      fileIn.close() // if it has not been open yet, close does nothing
+      key = fileIn.getPath
+      processed = true
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Parse the stream (and close it afterwards) and return the value as in 
type T
+   * @param inStream the stream to be read in
+   * @return the data formatted as
+   */
+  def parseStream(inStream: PortableDataStream): T
+}
+
+/**
+ * Reads the record in directly as a stream for other objects to manipulate 
and handle
+ */
+private[spark] class StreamRecordReader(
+    split: CombineFileSplit,
+    context: TaskAttemptContext,
+    index: Integer)
+  extends StreamBasedRecordReader[PortableDataStream](split, context, index) {
+
+  def parseStream(inStream: PortableDataStream): PortableDataStream = inStream
+}
+
+/**
+ * The format for the PortableDataStream files
+ */
+private[spark] class StreamInputFormat extends 
StreamFileInputFormat[PortableDataStream] {
+  override def createRecordReader(split: InputSplit, taContext: 
TaskAttemptContext) = {
+    new CombineFileRecordReader[String, PortableDataStream](
+      split.asInstanceOf[CombineFileSplit], taContext, 
classOf[StreamRecordReader])
+  }
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by not 
creating them
+ * until they need to be read
+ * @note TaskAttemptContext is not serializable resulting in the confBytes 
construct
+ * @note CombineFileSplit is not serializable resulting in the splitBytes 
construct
+ */
+@Experimental
+class PortableDataStream(
+    @transient isplit: CombineFileSplit,
+    @transient context: TaskAttemptContext,
+    index: Integer)
+  extends Serializable {
+
+  // transient forces file to be reopened after being serialization
+  // it is also used for non-serializable classes
+
+  @transient private var fileIn: DataInputStream = null
+  @transient private var isOpen = false
+
+  private val confBytes = {
+    val baos = new ByteArrayOutputStream()
+    context.getConfiguration.write(new DataOutputStream(baos))
+    baos.toByteArray
+  }
+
+  private val splitBytes = {
+    val baos = new ByteArrayOutputStream()
+    isplit.write(new DataOutputStream(baos))
+    baos.toByteArray
+  }
+
+  @transient private lazy val split = {
+    val bais = new ByteArrayInputStream(splitBytes)
+    val nsplit = new CombineFileSplit()
+    nsplit.readFields(new DataInputStream(bais))
+    nsplit
+  }
+
+  @transient private lazy val conf = {
+    val bais = new ByteArrayInputStream(confBytes)
+    val nconf = new Configuration()
+    nconf.readFields(new DataInputStream(bais))
+    nconf
+  }
+  /**
+   * Calculate the path name independently of opening the file
+   */
+  @transient private lazy val path = {
+    val pathp = split.getPath(index)
+    pathp.toString
+  }
+
+  /**
+   * Create a new DataInputStream from the split and context
+   */
+  def open(): DataInputStream = {
+    if (!isOpen) {
+      val pathp = split.getPath(index)
+      val fs = pathp.getFileSystem(conf)
+      fileIn = fs.open(pathp)
+      isOpen = true
+    }
+    fileIn
+  }
+
+  /**
+   * Read the file as a byte array
+   */
+  def toArray(): Array[Byte] = {
+    open()
+    val innerBuffer = ByteStreams.toByteArray(fileIn)
+    close()
+    innerBuffer
+  }
+
+  /**
+   * Close the file (if it is currently open)
+   */
+  def close() = {
+    if (isOpen) {
+      try {
+        fileIn.close()
+        isOpen = false
+      } catch {
+        case ioe: java.io.IOException => // do nothing
+      }
+    }
+  }
+
+  def getPath(): String = path
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/7136719b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala 
b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 4cb4505..183bce3 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -48,9 +48,10 @@ private[spark] class WholeTextFileInputFormat extends 
CombineFileInputFormat[Str
   }
 
   /**
-   * Allow minPartitions set by end-user in order to keep compatibility with 
old Hadoop API.
+   * Allow minPartitions set by end-user in order to keep compatibility with 
old Hadoop API,
+   * which is set through setMaxSplitSize
    */
-  def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+  def setMinPartitions(context: JobContext, minPartitions: Int) {
     val files = listStatus(context)
     val totalLen = files.map { file =>
       if (file.isDir) 0L else file.getLen

http://git-wip-us.apache.org/repos/asf/spark/blob/7136719b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
new file mode 100644
index 0000000..6e66ddb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.hadoop.conf.{ Configurable, Configuration }
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.spark.input.StreamFileInputFormat
+import org.apache.spark.{ Partition, SparkContext }
+
+private[spark] class BinaryFileRDD[T](
+  sc: SparkContext,
+  inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
+  keyClass: Class[String],
+  valueClass: Class[T],
+  @transient conf: Configuration,
+  minPartitions: Int)
+  extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, 
conf) {
+
+  override def getPartitions: Array[Partition] = {
+    val inputFormat = inputFormatClass.newInstance
+    inputFormat match {
+      case configurable: Configurable =>
+        configurable.setConf(conf)
+      case _ =>
+    }
+    val jobContext = newJobContext(conf, jobId)
+    inputFormat.setMinPartitions(jobContext, minPartitions)
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val result = new Array[Partition](rawSplits.size)
+    for (i <- 0 until rawSplits.size) {
+      result(i) = new NewHadoopPartition(id, i, 
rawSplits(i).asInstanceOf[InputSplit with Writable])
+    }
+    result
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7136719b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 3245632..6d6b867 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -263,7 +263,7 @@ private[spark] class WholeTextFileRDD(
       case _ =>
     }
     val jobContext = newJobContext(conf, jobId)
-    inputFormat.setMaxSplitSize(jobContext, minPartitions)
+    inputFormat.setMinPartitions(jobContext, minPartitions)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val result = new Array[Partition](rawSplits.size)
     for (i <- 0 until rawSplits.size) {

http://git-wip-us.apache.org/repos/asf/spark/blob/7136719b/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java 
b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index c21a4b3..59c86ee 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -18,10 +18,13 @@
 package org.apache.spark;
 
 import java.io.*;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
 import java.net.URI;
 import java.util.*;
 import java.util.concurrent.*;
 
+import org.apache.spark.input.PortableDataStream;
 import scala.Tuple2;
 import scala.Tuple3;
 import scala.Tuple4;
@@ -863,6 +866,82 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(pairs, readRDD.collect());
   }
 
+  @Test
+  public void binaryFiles() throws Exception {
+    // Reusing the wholeText files example
+    byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
+
+    String tempDirName = tempDir.getAbsolutePath();
+    File file1 = new File(tempDirName + "/part-00000");
+
+    FileOutputStream fos1 = new FileOutputStream(file1);
+
+    FileChannel channel1 = fos1.getChannel();
+    ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+    channel1.write(bbuf);
+    channel1.close();
+    JavaPairRDD<String, PortableDataStream> readRDD = 
sc.binaryFiles(tempDirName, 3);
+    List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
+    for (Tuple2<String, PortableDataStream> res : result) {
+      Assert.assertArrayEquals(content1, res._2().toArray());
+    }
+  }
+
+  @Test
+  public void binaryFilesCaching() throws Exception {
+    // Reusing the wholeText files example
+    byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
+
+    String tempDirName = tempDir.getAbsolutePath();
+    File file1 = new File(tempDirName + "/part-00000");
+
+    FileOutputStream fos1 = new FileOutputStream(file1);
+
+    FileChannel channel1 = fos1.getChannel();
+    ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+    channel1.write(bbuf);
+    channel1.close();
+
+    JavaPairRDD<String, PortableDataStream> readRDD = 
sc.binaryFiles(tempDirName).cache();
+    readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() {
+      @Override
+      public void call(Tuple2<String, PortableDataStream> pair) throws 
Exception {
+        pair._2().toArray(); // force the file to read
+      }
+    });
+
+    List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
+    for (Tuple2<String, PortableDataStream> res : result) {
+      Assert.assertArrayEquals(content1, res._2().toArray());
+    }
+  }
+
+  @Test
+  public void binaryRecords() throws Exception {
+    // Reusing the wholeText files example
+    byte[] content1 = "spark isn't always easy to use.\n".getBytes("utf-8");
+    int numOfCopies = 10;
+    String tempDirName = tempDir.getAbsolutePath();
+    File file1 = new File(tempDirName + "/part-00000");
+
+    FileOutputStream fos1 = new FileOutputStream(file1);
+
+    FileChannel channel1 = fos1.getChannel();
+
+    for (int i = 0; i < numOfCopies; i++) {
+      ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+      channel1.write(bbuf);
+    }
+    channel1.close();
+
+    JavaRDD<byte[]> readRDD = sc.binaryRecords(tempDirName, content1.length);
+    Assert.assertEquals(numOfCopies,readRDD.count());
+    List<byte[]> result = readRDD.collect();
+    for (byte[] res : result) {
+      Assert.assertArrayEquals(content1, res);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void writeWithNewAPIHadoopFile() {

http://git-wip-us.apache.org/repos/asf/spark/blob/7136719b/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala 
b/core/src/test/scala/org/apache/spark/FileSuite.scala
index a2b74c4..5e24196 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -19,6 +19,9 @@ package org.apache.spark
 
 import java.io.{File, FileWriter}
 
+import org.apache.spark.input.PortableDataStream
+import org.apache.spark.storage.StorageLevel
+
 import scala.io.Source
 
 import org.apache.hadoop.io._
@@ -224,6 +227,187 @@ class FileSuite extends FunSuite with LocalSparkContext {
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", 
"(3,aaa)"))
   }
 
+  test("binary file input as byte array") {
+    sc = new SparkContext("local", "test")
+    val outFile = new File(tempDir, "record-bytestream-00000.bin")
+    val outFileName = outFile.getAbsolutePath()
+
+    // create file
+    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+    // write data to file
+    val file = new java.io.FileOutputStream(outFile)
+    val channel = file.getChannel
+    channel.write(bbuf)
+    channel.close()
+    file.close()
+
+    val inRdd = sc.binaryFiles(outFileName)
+    val (infile: String, indata: PortableDataStream) = inRdd.collect.head
+
+    // Make sure the name and array match
+    assert(infile.contains(outFileName)) // a prefix may get added
+    assert(indata.toArray === testOutput)
+  }
+
+  test("portabledatastream caching tests") {
+    sc = new SparkContext("local", "test")
+    val outFile = new File(tempDir, "record-bytestream-00000.bin")
+    val outFileName = outFile.getAbsolutePath()
+
+    // create file
+    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+    // write data to file
+    val file = new java.io.FileOutputStream(outFile)
+    val channel = file.getChannel
+    channel.write(bbuf)
+    channel.close()
+    file.close()
+
+    val inRdd = sc.binaryFiles(outFileName).cache()
+    inRdd.foreach{
+      curData: (String, PortableDataStream) =>
+       curData._2.toArray() // force the file to read
+    }
+    val mappedRdd = inRdd.map {
+      curData: (String, PortableDataStream) =>
+        (curData._2.getPath(), curData._2)
+    }
+    val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head
+
+    // Try reading the output back as an object file
+
+    assert(indata.toArray === testOutput)
+  }
+
+  test("portabledatastream persist disk storage") {
+    sc = new SparkContext("local", "test")
+    val outFile = new File(tempDir, "record-bytestream-00000.bin")
+    val outFileName = outFile.getAbsolutePath()
+
+    // create file
+    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+    // write data to file
+    val file = new java.io.FileOutputStream(outFile)
+    val channel = file.getChannel
+    channel.write(bbuf)
+    channel.close()
+    file.close()
+
+    val inRdd = sc.binaryFiles(outFileName).persist(StorageLevel.DISK_ONLY)
+    inRdd.foreach{
+      curData: (String, PortableDataStream) =>
+        curData._2.toArray() // force the file to read
+    }
+    val mappedRdd = inRdd.map {
+      curData: (String, PortableDataStream) =>
+        (curData._2.getPath(), curData._2)
+    }
+    val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head
+
+    // Try reading the output back as an object file
+
+    assert(indata.toArray === testOutput)
+  }
+
+  test("portabledatastream flatmap tests") {
+    sc = new SparkContext("local", "test")
+    val outFile = new File(tempDir, "record-bytestream-00000.bin")
+    val outFileName = outFile.getAbsolutePath()
+
+    // create file
+    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+    val numOfCopies = 3
+    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+    // write data to file
+    val file = new java.io.FileOutputStream(outFile)
+    val channel = file.getChannel
+    channel.write(bbuf)
+    channel.close()
+    file.close()
+
+    val inRdd = sc.binaryFiles(outFileName)
+    val mappedRdd = inRdd.map {
+      curData: (String, PortableDataStream) =>
+        (curData._2.getPath(), curData._2)
+    }
+    val copyRdd = mappedRdd.flatMap {
+      curData: (String, PortableDataStream) =>
+        for(i <- 1 to numOfCopies) yield (i, curData._2)
+    }
+
+    val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect()
+
+    // Try reading the output back as an object file
+    assert(copyArr.length == numOfCopies)
+    copyArr.foreach{
+      cEntry: (Int, PortableDataStream) =>
+        assert(cEntry._2.toArray === testOutput)
+    }
+
+  }
+
+  test("fixed record length binary file as byte array") {
+    // a fixed length of 6 bytes
+
+    sc = new SparkContext("local", "test")
+
+    val outFile = new File(tempDir, "record-bytestream-00000.bin")
+    val outFileName = outFile.getAbsolutePath()
+
+    // create file
+    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+    val testOutputCopies = 10
+
+    // write data to file
+    val file = new java.io.FileOutputStream(outFile)
+    val channel = file.getChannel
+    for(i <- 1 to testOutputCopies) {
+      val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+      channel.write(bbuf)
+    }
+    channel.close()
+    file.close()
+
+    val inRdd = sc.binaryRecords(outFileName, testOutput.length)
+    // make sure there are enough elements
+    assert(inRdd.count == testOutputCopies)
+
+    // now just compare the first one
+    val indata: Array[Byte] = inRdd.collect.head
+    assert(indata === testOutput)
+  }
+
+  test ("negative binary record length should raise an exception") {
+    // a fixed length of 6 bytes
+    sc = new SparkContext("local", "test")
+
+    val outFile = new File(tempDir, "record-bytestream-00000.bin")
+    val outFileName = outFile.getAbsolutePath()
+
+    // create file
+    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+    val testOutputCopies = 10
+
+    // write data to file
+    val file = new java.io.FileOutputStream(outFile)
+    val channel = file.getChannel
+    for(i <- 1 to testOutputCopies) {
+      val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+      channel.write(bbuf)
+    }
+    channel.close()
+    file.close()
+
+    val inRdd = sc.binaryRecords(outFileName, -1)
+
+    intercept[SparkException] {
+      inRdd.count
+    }
+  }
+
   test("file caching") {
     sc = new SparkContext("local", "test")
     val out = new FileWriter(tempDir + "/input")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to