I am using v2.4.0-RC2

The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). How 
are you calling it?

When I do:
Val df = spark.read.format(mypackage).load().show()
I am getting a single creation, how are you creating the reader?

Thanks,
        Assaf

From: Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
Sent: Tuesday, October 9, 2018 2:02 PM
To: Mendelson, Assaf; user@spark.apache.org
Subject: Re: DataSourceV2 APIs creating multiple instances of DataSourceReader 
and hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive 
information.
Thanks Assaf, you tried with tags/v2.4.0-rc2?

Full Code:

MyDataSource is the entry point which simply creates Reader and Writer

public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, 
SessionConfigSupport {

  @Override public DataSourceReader createReader(DataSourceOptions options) {
    return new MyDataSourceReader(options.asMap());
  }

  @Override
  public Optional<DataSourceWriter> createWriter(String jobId, StructType 
schema,
      SaveMode mode, DataSourceOptions options) {
    // creates a dataSourcewriter here..
    return Optional.of(dataSourcewriter);
  }

  @Override public String keyPrefix() {
    return "myprefix";
  }

}

public class MyDataSourceReader implements DataSourceReader, 
SupportsScanColumnarBatch {

  StructType schema = null;
  Map<String, String> options;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader: 
Instantiated...." + this);
    this.options = options;
  }

  @Override
  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
    //variable this.schema is null here since readSchema() was called on a 
different instance
    System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + 
" schema: " + this.schema);
    //more logic......
    return null;
  }

  @Override
  public StructType readSchema() {
    //some logic to discover schema
    this.schema = (new StructType())
        .add("col1", "int")
        .add("col2", "string");
    System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + 
this.schema);
    return this.schema;
  }
}

Thanks,
Shubham

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Could you add a fuller code example? I tried to reproduce it in my environment 
and I am getting just one instance of the reader…

Thanks,
        Assaf

From: Shubham Chaurasia 
[mailto:shubh.chaura...@gmail.com<mailto:shubh.chaura...@gmail.com>]
Sent: Tuesday, October 9, 2018 9:31 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and 
hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive 
information.
Hi All,

--Spark built with tags/v2.4.0-rc2

Consider following DataSourceReader implementation:


public class MyDataSourceReader implements DataSourceReader, 
SupportsScanColumnarBatch {

  StructType schema = null;
  Map<String, String> options;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader: 
Instantiated...." + this);
    this.options = options;
  }

  @Override
  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
    //variable this.schema is null here since readSchema() was called on a 
different instance
    System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + 
" schema: " + this.schema);
    //more logic......
    return null;
  }

  @Override
  public StructType readSchema() {
    //some logic to discover schema
    this.schema = (new StructType())
        .add("col1", "int")
        .add("col2", "string");
    System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + 
this.schema);
    return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which sets 
class variable schema.

2) Now when planBatchInputPartitions() is called, it is being called on a 
different instance of MyDataSourceReader and hence I am not getting the value 
of schema in method planBatchInputPartitions().



How can I get value of schema which was set in readSchema() method, in 
planBatchInputPartitions() method?



Console Logs:



scala> mysource.executeQuery("select * from movie").show



MyDataSourceReader.MyDataSourceReader: 
Instantiated....MyDataSourceReader@59ea8f1b<mailto:Instantiated....MyDataSourceReader@59ea8f1b>

MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: 
StructType(StructField(col1,IntegerType,true), 
StructField(col2,StringType,true))

MyDataSourceReader.MyDataSourceReader: 
Instantiated....MyDataSourceReader@a3cd3ff<mailto:Instantiated....MyDataSourceReader@a3cd3ff>

MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: 
null

Thanks,
Shubham


Reply via email to