Hi Ryan, Thanks for the quick reply - I like the Iceberg approach, will keep an eye on it.
So creating custom batch/non-streaming data source is not difficult. The issue I have is when a streaming data source. Similar to batch source, you need to implement a simple trait - org.apache.spark.sql.execution.streaming.Source (example below). The "getBatch" expects a dataframe and that dataframe needs to have one of its attributes "isStreaming" to be set to true. However that is not exposed during dataframe creation and the only way to do it is to make your package/class a child of org.apache.spark.sql As I write this, I think having my code on github will make it easy to illustrate. See a Spark Jira comment that illustrates the same problem for Spark packaged streaming source https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919 class MyDataStreamSource(sqlContext: SQLContext, override val schema: StructType, numPartitions: Int, numRowsPerPartition: Int) extends Source { override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = System.currentTimeMillis())) override def commit(end: Offset): Unit = {} override def stop: Unit = {} override def getBatch(start: Option[Offset], end: Offset): DataFrame = { val batchStreamTime = System.currentTimeMillis() // end.asInstanceOf[MyDataStreamOffset].value val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, batchStreamTime, numPartitions, numRowsPerPartition) val internalRow = rdd.map(row => InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String]))) sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true) } } From: Ryan Blue <[email protected]> Reply-To: "[email protected]" <[email protected]> Date: Thursday, March 22, 2018 at 1:45 PM To: "Thakrar, Jayesh" <[email protected]> Cc: "[email protected]" <[email protected]> Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package? Jayesh, We're working on a new API for building sources, DataSourceV2. That API allows you to produce UnsafeRow and we are very likely going to change that to InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 release if you'd like to try it out. Here's an example implementation from the Iceberg table format: https://github.com/Netflix/iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java rb On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh <[email protected]<mailto:[email protected]>> wrote: Because these are not exposed in the usual API, its not possible (or difficult) to create custom structured streaming sources. Consequently, one has to create streaming sources in packages under org.apache.spark.sql. Any pointers or info is greatly appreciated. -- Ryan Blue Software Engineer Netflix
