Thank you Wenchen - that was very helpful! So apparently, the Kafka datasource seems to be still using the older (non-V2) API and hence the use of internalCreateDataFrame and isStreaming.
The ones that you pointed me to has the migration from the old approach to the V2 approach as you mentioned and that requires the data sources to generate RDD[Row] which is what I was looking for. And yes, I understand the the API is still in flux and subject to change :) Thanks again to both you and Ryan! Jayesh From: Wenchen Fan <[email protected]> Date: Thursday, March 22, 2018 at 6:59 PM To: "Thakrar, Jayesh" <[email protected]> Cc: "[email protected]" <[email protected]>, "[email protected]" <[email protected]> Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package? You can look at the PRs that migrate builtin streaming data sources to the V2 API: https://github.com/apache/spark/pulls?utf8=%E2%9C%93&q=is%3Apr+migrate+in%3Atitle+is%3Aclosed+ On Thu, Mar 22, 2018 at 12:58 PM, Thakrar, Jayesh <[email protected]<mailto:[email protected]>> wrote: Thanks Wenchen, - yes, I did refer to the Spark inbuilt sources as mentioned earlier and have been using the Kafka streaming as a reference example. The builtin ones work and use the internalCreateDataFrame - and that's where I got the idea about using the method to set the "isStreaming" to true. But I should confess that I don't know the source code very well, so will appreciate if you can point me to any other pointers/examples please. From: Wenchen Fan <[email protected]<mailto:[email protected]>> Date: Thursday, March 22, 2018 at 2:52 PM To: "Thakrar, Jayesh" <[email protected]<mailto:[email protected]>> Cc: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>>, "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package? org.apache.spark.sql.execution.streaming.Source is for internal use only. The official stream data source API is the data source v2 API. You can take a look at the Spark built-in streaming data sources as examples. Note: data source v2 is still experimental, you may need to update your code in a new Spark release :) On Thu, Mar 22, 2018 at 12:43 PM, Thakrar, Jayesh <[email protected]<mailto:[email protected]>> wrote: Hi Ryan, Thanks for the quick reply - I like the Iceberg approach, will keep an eye on it. So creating custom batch/non-streaming data source is not difficult. The issue I have is when a streaming data source. Similar to batch source, you need to implement a simple trait - org.apache.spark.sql.execution.streaming.Source (example below). The "getBatch" expects a dataframe and that dataframe needs to have one of its attributes "isStreaming" to be set to true. However that is not exposed during dataframe creation and the only way to do it is to make your package/class a child of org.apache.spark.sql As I write this, I think having my code on github will make it easy to illustrate. See a Spark Jira comment that illustrates the same problem for Spark packaged streaming source https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919 class MyDataStreamSource(sqlContext: SQLContext, override val schema: StructType, numPartitions: Int, numRowsPerPartition: Int) extends Source { override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = System.currentTimeMillis())) override def commit(end: Offset): Unit = {} override def stop: Unit = {} override def getBatch(start: Option[Offset], end: Offset): DataFrame = { val batchStreamTime = System.currentTimeMillis() // end.asInstanceOf[MyDataStreamOffset].value val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, batchStreamTime, numPartitions, numRowsPerPartition) val internalRow = rdd.map(row => InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String]))) sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true) } } From: Ryan Blue <[email protected]<mailto:[email protected]>> Reply-To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Date: Thursday, March 22, 2018 at 1:45 PM To: "Thakrar, Jayesh" <[email protected]<mailto:[email protected]>> Cc: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package? Jayesh, We're working on a new API for building sources, DataSourceV2. That API allows you to produce UnsafeRow and we are very likely going to change that to InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 release if you'd like to try it out. Here's an example implementation from the Iceberg table format: https://github.com/Netflix/iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java rb On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh <[email protected]<mailto:[email protected]>> wrote: Because these are not exposed in the usual API, its not possible (or difficult) to create custom structured streaming sources. Consequently, one has to create streaming sources in packages under org.apache.spark.sql. Any pointers or info is greatly appreciated. -- Ryan Blue Software Engineer Netflix
