Alright, so it is a big project which uses a SQL store underneath. I extracted out the minimal code and made a smaller project out of it and still it is creating multiple instances.
Here is my project: ├── my-datasource.iml ├── pom.xml ├── src │ ├── main │ │ ├── java │ │ │ └── com │ │ │ └── shubham │ │ │ ├── MyDataSource.java │ │ │ └── reader │ │ │ └── MyDataSourceReader.java MyDataSource.java ------------------------------------------------- package com.shubham; import com.shubham.reader.MyDataSourceReader; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.ReadSupport; import org.apache.spark.sql.sources.v2.WriteSupport; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.types.StructType; import java.util.Optional; public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport { public DataSourceReader createReader(DataSourceOptions options) { System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader"); return new MyDataSourceReader(options.asMap()); } public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) { return Optional.empty(); } } MyDataSourceReader.java ------------------------------------------------- package com.shubham.reader; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.reader.InputPartition; import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnarBatch; import java.util.ArrayList; import java.util.List; import java.util.Map; public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch { private Map<String, String> options; private StructType schema; public MyDataSourceReader(Map<String, String> options) { System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this); this.options = options; } @Override public StructType readSchema() { this.schema = (new StructType()) .add("col1", "int") .add("col2", "string"); System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema); return this.schema; } @Override public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema); return new ArrayList<>(); } } ---------------------------------------- spark-shell output ---------------------------------------- scala> spark.read.format("com.shubham.MyDataSource").option("query", "select * from some_table").load.show MyDataSource.createReader: Going to create a new MyDataSourceReader MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536 MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@69fa5536 schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true)) MyDataSource.createReader: Going to create a new MyDataSourceReader MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@3095c449 MyDataSourceReader.planBatchInputPartitions: com.shubham.reader.MyDataSourceReader@3095c449 schema: null +----+----+ |col1|col2| +----+----+ +----+----+ Here 2 instances of reader, MyDataSourceReader@69fa5536 and MyDataSourceReader@3095c449 are being created. Consequently schema is null in MyDataSourceReader@3095c449. Am I not doing it the correct way? Thanks, Shubham On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <assaf.mendel...@rsa.com> wrote: > I am using v2.4.0-RC2 > > > > The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). > How are you calling it? > > > > When I do: > > Val df = spark.read.format(mypackage).load().show() > > I am getting a single creation, how are you creating the reader? > > > > Thanks, > > Assaf > > > > *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] > *Sent:* Tuesday, October 9, 2018 2:02 PM > *To:* Mendelson, Assaf; user@spark.apache.org > *Subject:* Re: DataSourceV2 APIs creating multiple instances of > DataSourceReader and hence not preserving the state > > > > [EXTERNAL EMAIL] > Please report any suspicious attachments, links, or requests for sensitive > information. > > Thanks Assaf, you tried with *tags/v2.4.0-rc2?* > > > > Full Code: > > > > MyDataSource is the entry point which simply creates Reader and Writer > > > > public class MyDataSource implements DataSourceV2, WriteSupport, > ReadSupport, SessionConfigSupport { > > > > @Override public DataSourceReader createReader(DataSourceOptions > options) { > > return new MyDataSourceReader(options.asMap()); > > } > > > > @Override > > public Optional<DataSourceWriter> createWriter(String jobId, StructType > schema, > > SaveMode mode, DataSourceOptions options) { > > // creates a dataSourcewriter here.. > > return Optional.of(dataSourcewriter); > > } > > > > @Override public String keyPrefix() { > > return "myprefix"; > > } > > > > } > > > > public class MyDataSourceReader implements DataSourceReader, > SupportsScanColumnarBatch { > > > > StructType schema = null; > > Map<String, String> options; > > > > public MyDataSourceReader(Map<String, String> options) { > > System.out.println("MyDataSourceReader.MyDataSourceReader: > Instantiated...." + this); > > this.options = options; > > } > > > > @Override > > public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { > > //variable this.schema is null here since readSchema() was called on a > different instance > > System.out.println("MyDataSourceReader.planBatchInputPartitions: " + > this + " schema: " + this.schema); > > //more logic...... > > return null; > > } > > > > @Override > > public StructType readSchema() { > > //some logic to discover schema > > this.schema = (new StructType()) > > .add("col1", "int") > > .add("col2", "string"); > > System.out.println("MyDataSourceReader.readSchema: " + this + " > schema: " + this.schema); > > return this.schema; > > } > > } > > > > Thanks, > > Shubham > > > > On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <assaf.mendel...@rsa.com> > wrote: > > Could you add a fuller code example? I tried to reproduce it in my > environment and I am getting just one instance of the reader… > > > > Thanks, > > Assaf > > > > *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] > *Sent:* Tuesday, October 9, 2018 9:31 AM > *To:* user@spark.apache.org > *Subject:* DataSourceV2 APIs creating multiple instances of > DataSourceReader and hence not preserving the state > > > > [EXTERNAL EMAIL] > Please report any suspicious attachments, links, or requests for sensitive > information. > > Hi All, > > > > --Spark built with *tags/v2.4.0-rc2* > > > > Consider following DataSourceReader implementation: > > > > *public class *MyDataSourceReader *implements *DataSourceReader, > SupportsScanColumnarBatch { > > StructType *schema *= *null*; > Map<String, String> *options*; > > *public *MyDataSourceReader(Map<String, String> options) { > System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: > Instantiated...." *+ *this*); > *this*.*options *= options; > } > > @Override > *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { > > *//variable this.schema is null here since readSchema() was called on a > different instance > *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ > *this *+ *" schema: " *+ *this*.*schema*); > > *//more logic...... **return null*; > } > > @Override > *public *StructType readSchema() { > > *//some logic to discover schema **this*.*schema *= (*new *StructType()) > .add(*"col1"*, *"int"*) > .add(*"col2"*, *"string"*); > System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" > schema: " *+ *this*.*schema*); > *return this*.*schema*; > } > } > > 1) First readSchema() is called on MyDataSourceReader@instance1 which sets > class variable schema. > > 2) Now when planBatchInputPartitions() is called, it is being called on a > different instance of MyDataSourceReader and hence I am not getting the value > of schema in method planBatchInputPartitions(). > > > > How can I get value of schema which was set in readSchema() method, in > planBatchInputPartitions() method? > > > > Console Logs: > > > > scala> mysource.executeQuery("select * from movie").show > > > > MyDataSourceReader.MyDataSourceReader: > Instantiated....MyDataSourceReader@59ea8f1b > > MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: > StructType(StructField(col1,IntegerType,true), > StructField(col2,StringType,true)) > > MyDataSourceReader.MyDataSourceReader: > Instantiated....MyDataSourceReader@a3cd3ff > > MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff > schema: null > > > > Thanks, > > Shubham > > > >