Hello,
I'm trying to extend Spark so that it can use our own binary format as a
read-only source for pipeline based computations.
I already have a java class that gives me enough elements to build a
complete StructType with enough metadata (NominalAttribute for instance).
It also gives me the row count for the file and methods to read any
given cell, as it basically is a giant array of values stored on disk.
In order for this to plug properly in the Spark framework, I looked at
the CSV source code and thus created a DefaultSource class in my
package, this way:
class DefaultSource
extends RelationProvider
with DataSourceRegister {
override def shortName(): String = "binfile"
private def checkPath(parameters: Map[String, String]): String = {
parameters.getOrElse("path", sys.error("'path' must be specified
for BinFile data."))
}
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]):
BaseRelation = {
val path = checkPath(parameters)
BinFileRelation(Some(path))(sqlContext)
}
}
I also created the BinFileRelation like this:
case class BinFileRelation /*protected[spark]*/ (
location: Option[String])(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan {
private val reader = new BinFileReader(location.getOrElse(""))
override val schema: StructType = {
// retrieve column infos from reader, transform it into a valid
StructType with two columns,
// the first being the label, the second being the vector of features
}
override def buildScan: RDD[Row] = {
// I have no idea what to return here, so null for now.
null
}
}
So, as you see, I managed to create the required code to return a valid
schema, and was also able to write unittests for it.
I copied "protected[spark]" from the CSV implementation, but I commented
it out because it prevents compilation from being successful and it does
not seem to be required.
And most importantly, I have no idea how to create a valid dataframe to
be returned by buildScan so that the data that is stored on disk is not
loaded all at once in memory (it may be very huge, like hundreds of
millions of rows).
I read the documentation here:
https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/sources/BaseRelation.html
It says "Concrete implementation should inherit from one of the
descendant Scan classes" but I could not find those any of those
descendant in the documentation nor in the source code.
Looking further in the code for "BaseRelation" I found the JDBCRelation
class that implements buildScan by calling JDBCRDD.scanTable so I went
looking at this method which basically creates an instance of the
private class named JDBCRDD as well.
This class extends Row[InternalRow] so it looks to me as if I should to
the same for my own use
However, I'm not sure how to implement the compute method for a simple
read as mentioned above.
Any help would be greatly appreciated.
---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org