org.apache.spark.sql.execution.streaming.Source is for internal use only. The official stream data source API is the data source v2 API. You can take a look at the Spark built-in streaming data sources as examples. Note: data source v2 is still experimental, you may need to update your code in a new Spark release :)
On Thu, Mar 22, 2018 at 12:43 PM, Thakrar, Jayesh < [email protected]> wrote: > Hi Ryan, > > > > Thanks for the quick reply - I like the Iceberg approach, will keep an eye > on it. > > > > So creating custom batch/non-streaming data source is not difficult. > > > > The issue I have is when a streaming data source. > > > > Similar to batch source, you need to implement a simple trait - > org.apache.spark.sql.execution.streaming.Source (example below). > > The "getBatch" expects a dataframe and that dataframe needs to have one of > its attributes "isStreaming" to be set to true. > > However that is not exposed during dataframe creation and the only way to > do it is to make your package/class a child of org.apache.spark.sql > > > > As I write this, I think having my code on github will make it easy to > illustrate. > > > > See a Spark Jira comment that illustrates the same problem for Spark > packaged streaming source > > > > https://issues.apache.org/jira/browse/SPARK-21765? > focusedCommentId=16142919&page=com.atlassian.jira. > plugin.system.issuetabpanels:comment-tabpanel#comment-16142919 > > > > > > *class *MyDataStreamSource(sqlContext: SQLContext, > *override val *schema: StructType, > numPartitions: Int, > numRowsPerPartition: Int) > *extends *Source { > > *override def *getOffset: Option[Offset] = *Some*(*new > *MyDataStreamOffset(offset > = System.*currentTimeMillis*())) > > *override def *commit(end: Offset): Unit = {} > > *override def *stop: Unit = {} > > *override def *getBatch(start: Option[Offset], end: Offset): DataFrame > = { > *val *batchStreamTime = System.*currentTimeMillis*() > *// end.asInstanceOf[MyDataStreamOffset].value **val *rdd: RDD[Row] = *new > *MyDataStreamRDD(sqlContext.sparkContext, batchStreamTime, numPartitions, > numRowsPerPartition) > *val *internalRow = rdd.map(row => *InternalRow*(UTF8String. > *fromString*(row.get(0).asInstanceOf[String]))) > sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = > *true*) > } > > } > > > > > > > > *From: *Ryan Blue <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Thursday, March 22, 2018 at 1:45 PM > *To: *"Thakrar, Jayesh" <[email protected]> > *Cc: *"[email protected]" <[email protected]> > *Subject: *Re: Any reason for not exposing internalCreateDataFrame or > isStreaming beyond sql package? > > > > Jayesh, > > > > We're working on a new API for building sources, DataSourceV2. That API > allows you to produce UnsafeRow and we are very likely going to change that > to InternalRow (SPARK-23325). There's an experimental version in the latest > 2.3.0 release if you'd like to try it out. > > > > Here's an example implementation from the Iceberg table format: > https://github.com/Netflix/iceberg/blob/master/spark/src/main/java/com/ > netflix/iceberg/spark/source/Reader.java > > > > rb > > > > On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh < > [email protected]> wrote: > > Because these are not exposed in the usual API, its not possible (or > difficult) to create custom structured streaming sources. > > > > Consequently, one has to create streaming sources in packages under > org.apache.spark.sql. > > > > Any pointers or info is greatly appreciated. > > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix >
