Thanks Wenchen, - yes, I did refer to the Spark inbuilt sources as mentioned 
earlier and have been using the Kafka streaming as a reference example.
The builtin ones work and use the internalCreateDataFrame - and that's where I 
got the idea about using the method to set the "isStreaming" to true.

But I should confess that I don't know the source code very well, so will 
appreciate if you can point me to any other pointers/examples please.

From: Wenchen Fan <[email protected]>
Date: Thursday, March 22, 2018 at 2:52 PM
To: "Thakrar, Jayesh" <[email protected]>
Cc: "[email protected]" <[email protected]>, "[email protected]" 
<[email protected]>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming 
beyond sql package?

org.apache.spark.sql.execution.streaming.Source is for internal use only. The 
official stream data source API is the data source v2 API. You can take a look 
at the Spark built-in streaming data sources as examples. Note: data source v2 
is still experimental, you may need to update your code in a new Spark release 
:)

On Thu, Mar 22, 2018 at 12:43 PM, Thakrar, Jayesh 
<[email protected]<mailto:[email protected]>> wrote:
Hi Ryan,

Thanks for the quick reply - I like the Iceberg approach, will keep an eye on 
it.

So creating custom batch/non-streaming data source is not difficult.

The issue I have is when a streaming data source.

Similar to batch source, you need to implement a simple trait - 
org.apache.spark.sql.execution.streaming.Source (example below).
The "getBatch" expects a dataframe and that dataframe needs to have one of its 
attributes "isStreaming" to be set to true.
However that is not exposed during dataframe creation and the only way to do it 
is to make your package/class a child of org.apache.spark.sql

As I write this, I think having my code on github will make it easy to 
illustrate.

See a Spark Jira comment that illustrates the same problem for Spark packaged 
streaming source

https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919


class MyDataStreamSource(sqlContext: SQLContext,
                         override val schema: StructType,
                         numPartitions: Int,
                         numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = 
System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    val batchStreamTime = System.currentTimeMillis() // 
end.asInstanceOf[MyDataStreamOffset].value
    val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, 
batchStreamTime, numPartitions, numRowsPerPartition)
    val internalRow = rdd.map(row => 
InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String])))
    sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true)
  }

}



From: Ryan Blue <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Thursday, March 22, 2018 at 1:45 PM
To: "Thakrar, Jayesh" 
<[email protected]<mailto:[email protected]>>
Cc: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming 
beyond sql package?

Jayesh,

We're working on a new API for building sources, DataSourceV2. That API allows 
you to produce UnsafeRow and we are very likely going to change that to 
InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 
release if you'd like to try it out.

Here's an example implementation from the Iceberg table format: 
https://github.com/Netflix/iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java

rb

On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh 
<[email protected]<mailto:[email protected]>> wrote:
Because these are not exposed in the usual API, its not possible (or difficult) 
to create custom structured streaming sources.

Consequently, one has to create streaming sources in packages under 
org.apache.spark.sql.

Any pointers or info is greatly appreciated.



--
Ryan Blue
Software Engineer
Netflix

Reply via email to