org.apache.spark.sql.execution.streaming.Source is for internal use only.
The official stream data source API is the data source v2 API. You can take
a look at the Spark built-in streaming data sources as examples. Note: data
source v2 is still experimental, you may need to update your code in a new
Spark release :)

On Thu, Mar 22, 2018 at 12:43 PM, Thakrar, Jayesh <
[email protected]> wrote:

> Hi Ryan,
>
>
>
> Thanks for the quick reply - I like the Iceberg approach, will keep an eye
> on it.
>
>
>
> So creating custom batch/non-streaming data source is not difficult.
>
>
>
> The issue I have is when a streaming data source.
>
>
>
> Similar to batch source, you need to implement a simple trait -
> org.apache.spark.sql.execution.streaming.Source (example below).
>
> The "getBatch" expects a dataframe and that dataframe needs to have one of
> its attributes "isStreaming" to be set to true.
>
> However that is not exposed during dataframe creation and the only way to
> do it is to make your package/class a child of org.apache.spark.sql
>
>
>
> As I write this, I think having my code on github will make it easy to
> illustrate.
>
>
>
> See a Spark Jira comment that illustrates the same problem for Spark
> packaged streaming source
>
>
>
> https://issues.apache.org/jira/browse/SPARK-21765?
> focusedCommentId=16142919&page=com.atlassian.jira.
> plugin.system.issuetabpanels:comment-tabpanel#comment-16142919
>
>
>
>
>
> *class *MyDataStreamSource(sqlContext: SQLContext,
>                          *override val *schema: StructType,
>                          numPartitions: Int,
>                          numRowsPerPartition: Int)
>   *extends *Source {
>
>   *override def *getOffset: Option[Offset] = *Some*(*new 
> *MyDataStreamOffset(offset
> = System.*currentTimeMillis*()))
>
>   *override def *commit(end: Offset): Unit = {}
>
>   *override def *stop: Unit = {}
>
>   *override def *getBatch(start: Option[Offset], end: Offset): DataFrame
> = {
>     *val *batchStreamTime = System.*currentTimeMillis*()
> *// end.asInstanceOf[MyDataStreamOffset].value     **val *rdd: RDD[Row] = *new
> *MyDataStreamRDD(sqlContext.sparkContext, batchStreamTime, numPartitions,
> numRowsPerPartition)
>     *val *internalRow = rdd.map(row => *InternalRow*(UTF8String.
> *fromString*(row.get(0).asInstanceOf[String])))
>     sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming =
> *true*)
>   }
>
> }
>
>
>
>
>
>
>
> *From: *Ryan Blue <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Thursday, March 22, 2018 at 1:45 PM
> *To: *"Thakrar, Jayesh" <[email protected]>
> *Cc: *"[email protected]" <[email protected]>
> *Subject: *Re: Any reason for not exposing internalCreateDataFrame or
> isStreaming beyond sql package?
>
>
>
> Jayesh,
>
>
>
> We're working on a new API for building sources, DataSourceV2. That API
> allows you to produce UnsafeRow and we are very likely going to change that
> to InternalRow (SPARK-23325). There's an experimental version in the latest
> 2.3.0 release if you'd like to try it out.
>
>
>
> Here's an example implementation from the Iceberg table format:
> https://github.com/Netflix/iceberg/blob/master/spark/src/main/java/com/
> netflix/iceberg/spark/source/Reader.java
>
>
>
> rb
>
>
>
> On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh <
> [email protected]> wrote:
>
> Because these are not exposed in the usual API, its not possible (or
> difficult) to create custom structured streaming sources.
>
>
>
> Consequently, one has to create streaming sources in packages under
> org.apache.spark.sql.
>
>
>
> Any pointers or info is greatly appreciated.
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>

Reply via email to