Hi Ryan,

Thanks for the quick reply - I like the Iceberg approach, will keep an eye on 
it.

So creating custom batch/non-streaming data source is not difficult.

The issue I have is when a streaming data source.

Similar to batch source, you need to implement a simple trait - 
org.apache.spark.sql.execution.streaming.Source (example below).
The "getBatch" expects a dataframe and that dataframe needs to have one of its 
attributes "isStreaming" to be set to true.
However that is not exposed during dataframe creation and the only way to do it 
is to make your package/class a child of org.apache.spark.sql

As I write this, I think having my code on github will make it easy to 
illustrate.

See a Spark Jira comment that illustrates the same problem for Spark packaged 
streaming source

https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919


class MyDataStreamSource(sqlContext: SQLContext,
                         override val schema: StructType,
                         numPartitions: Int,
                         numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = 
System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    val batchStreamTime = System.currentTimeMillis() // 
end.asInstanceOf[MyDataStreamOffset].value
    val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, 
batchStreamTime, numPartitions, numRowsPerPartition)
    val internalRow = rdd.map(row => 
InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String])))
    sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true)
  }

}



From: Ryan Blue <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, March 22, 2018 at 1:45 PM
To: "Thakrar, Jayesh" <[email protected]>
Cc: "[email protected]" <[email protected]>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming 
beyond sql package?

Jayesh,

We're working on a new API for building sources, DataSourceV2. That API allows 
you to produce UnsafeRow and we are very likely going to change that to 
InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 
release if you'd like to try it out.

Here's an example implementation from the Iceberg table format: 
https://github.com/Netflix/iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java

rb

On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh 
<[email protected]<mailto:[email protected]>> wrote:
Because these are not exposed in the usual API, its not possible (or difficult) 
to create custom structured streaming sources.

Consequently, one has to create streaming sources in packages under 
org.apache.spark.sql.

Any pointers or info is greatly appreciated.



--
Ryan Blue
Software Engineer
Netflix

Reply via email to