Hello,

I'm trying to extend Spark so that it can use our own binary format as a read-only source for pipeline based computations. I already have a java class that gives me enough elements to build a complete StructType with enough metadata (NominalAttribute for instance). It also gives me the row count for the file and methods to read any given cell, as it basically is a giant array of values stored on disk. In order for this to plug properly in the Spark framework, I looked at the CSV source code and thus created a DefaultSource class in my package, this way:

class DefaultSource
  extends RelationProvider
  with DataSourceRegister {

  override def shortName(): String = "binfile"

  private def checkPath(parameters: Map[String, String]): String = {
parameters.getOrElse("path", sys.error("'path' must be specified for BinFile data."))
  }
  override def createRelation(
                               sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
    val path = checkPath(parameters)
    BinFileRelation(Some(path))(sqlContext)
  }
}

I also created the BinFileRelation like this:

case class BinFileRelation /*protected[spark]*/ (
    location: Option[String])(@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan {

  private val reader = new BinFileReader(location.getOrElse(""))
  override val schema: StructType = {
// retrieve column infos from reader, transform it into a valid StructType with two columns,
    // the first being the label, the second being the vector of features
  }

  override def buildScan: RDD[Row] = {
    // I have no idea what to return here, so null for now.
    null
  }
}

So, as you see, I managed to create the required code to return a valid schema, and was also able to write unittests for it. I copied "protected[spark]" from the CSV implementation, but I commented it out because it prevents compilation from being successful and it does not seem to be required. And most importantly, I have no idea how to create a valid dataframe to be returned by buildScan so that the data that is stored on disk is not loaded all at once in memory (it may be very huge, like hundreds of millions of rows). I read the documentation here: https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/sources/BaseRelation.html It says "Concrete implementation should inherit from one of the descendant Scan classes" but I could not find those any of those descendant in the documentation nor in the source code.

Looking further in the code for "BaseRelation" I found the JDBCRelation class that implements buildScan by calling JDBCRDD.scanTable so I went looking at this method which basically creates an instance of the private class named JDBCRDD as well. This class extends Row[InternalRow] so it looks to me as if I should to the same for my own use However, I'm not sure how to implement the compute method for a simple read as mentioned above.

Any help would be greatly appreciated.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to