[ 
https://issues.apache.org/jira/browse/SPARK-25390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178052#comment-17178052
 ] 

Rafael edited comment on SPARK-25390 at 8/14/20, 8:42 PM:
----------------------------------------------------------

Hey guys, 

I'm trying to migrate the package where I was using *import 
org.apache.spark.sql.sources.v2._ into Spark3.0.0*

and haven't found a good guide so may I ask my questions here.

Here my migration plan can you highlight what interfaces should I use now

1. Cannot find what should I use instead of ReadSupport, ReadSupport, 
DataSourceReader,

if instead of ReadSupport we have to use now Scan then what happened to method 
createReader? 
{code:java}
class DefaultSource extends ReadSupport { 
  override def createReader(options: DataSourceOptions): DataSourceReader = new 
GeneratingReader() 
}
{code}
2. 

Here instead of
{code:java}
import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, 
Partitioning}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
SupportsReportPartitioning}
{code}
I should use
{code:java}
import org.apache.spark.sql.connector.read.partitioning.{Distribution, 
Partitioning}
import org.apache.spark.sql.connector.read.{InputPartition, 
SupportsReportPartitioning}
{code}
right?
{code:java}
class GeneratingReader() extends DataSourceReader {
  override def readSchema(): StructType = {...}
  override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
    val partitions = new util.ArrayList[InputPartition[InternalRow]]()
    ...
    partitions.add(new GeneratingInputPartition(...))
  }
  override def outputPartitioning(): Partitioning = {...}
}
{code}
3.

Haven't found what should I use instead of
{code:java}
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader{code}
Interface like it has totally different contract
{code:java}
import org.apache.spark.sql.connector.read.InputPartition{code}
{code:java}
class GeneratingInputPartition() extends InputPartition[InternalRow] {
  override def createPartitionReader(): InputPartitionReader[InternalRow] = new 
 GeneratingInputPartitionReader(...)
}

class GeneratingInputPartitionReader() extends 
InputPartitionReader[InternalRow] {
   override def next(): Boolean = ...
   override def get(): InternalRow = ...
   override def close(): Unit = ...
}
{code}


was (Author: kyrdan):
Hey guys, 

I'm trying to migrate the package where I was using *import 
org.apache.spark.sql.sources.v2._ into Spark3.0.0*

and haven't found a good guide so may I ask my questions here.

 

Here my migration plan can you highlight what interfaces should I use now

 

1. Cannot find what should I use instead of ReadSupport, ReadSupport, 
DataSourceReader,

if instead of ReadSupport we have to use now Scan then what happened to method 
createReader? 
{code:java}

class DefaultSource extends ReadSupport { 
  override def createReader(options: DataSourceOptions): DataSourceReader = new 
GeneratingReader() 
}
{code}
2. 

Here instead of

 
{code:java}
import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, 
Partitioning}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
SupportsReportPartitioning}
{code}
 

I should use

 
{code:java}
import org.apache.spark.sql.connector.read.partitioning.{Distribution, 
Partitioning}
import org.apache.spark.sql.connector.read.{InputPartition, 
SupportsReportPartitioning}
{code}
 

right?
{code:java}
class GeneratingReader() extends DataSourceReader {
  override def readSchema(): StructType = {...}
  override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
    val partitions = new util.ArrayList[InputPartition[InternalRow]]()
    ...
    partitions.add(new GeneratingInputPartition(...))
  }
  override def outputPartitioning(): Partitioning = {...}
}
{code}
3.

Haven't found what should I use instead of
{code:java}
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader{code}
Interface like it has totally different contract
{code:java}
import org.apache.spark.sql.connector.read.InputPartition{code}
{code:java}
class GeneratingInputPartition() extends InputPartition[InternalRow] {
  override def createPartitionReader(): InputPartitionReader[InternalRow] = new 
 GeneratingInputPartitionReader(...)
}

class GeneratingInputPartitionReader() extends 
InputPartitionReader[InternalRow] {
   override def next(): Boolean = ...
   override def get(): InternalRow = ...
   override def close(): Unit = ...
}
{code}

> Data source V2 API refactoring
> ------------------------------
>
>                 Key: SPARK-25390
>                 URL: https://issues.apache.org/jira/browse/SPARK-25390
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Wenchen Fan
>            Assignee: Wenchen Fan
>            Priority: Major
>             Fix For: 3.0.0
>
>
> Currently it's not very clear how we should abstract data source v2 API. The 
> abstraction should be unified between batch and streaming, or similar but 
> have a well-defined difference between batch and streaming. And the 
> abstraction should also include catalog/table.
> An example of the abstraction:
> {code}
> batch: catalog -> table -> scan
> streaming: catalog -> table -> stream -> scan
> {code}
> We should refactor the data source v2 API according to the abstraction



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to