I think this is expected behavior, though not what I think is reasonable in
the long term. To my knowledge, this is how the v1 sources behave, and v2
just reuses the same mechanism to instantiate sources and uses a new
interface for v2 features.

I think that the right approach is to use catalogs, which I've proposed in
#21306 <https://github.com/apache/spark/pull/21306>. A catalog would be
loaded by reflection just once and then configured. After that, the same
instance for a given Spark SQL session would be reused.

Because the catalog instantiates table instances that expose read and write
capabilities (ReadSupport, WriteSupport), it can choose how to manage the
life-cycle of those tables and can also cache instances to control how
table state changes after a table is loaded. (Iceberg does this to use a
fixed snapshot for all reads until the table is written to or is garbage
collected.)

rb

On Tue, Oct 9, 2018 at 8:30 PM Hyukjin Kwon <gurwls...@gmail.com> wrote:

> I took a look for the codes.
>
> val source = classOf[MyDataSource].getCanonicalName
> spark.read.format(source).load().collect()
>
> Looks indeed it calls twice.
>
> First all: Looks it creates it first to read the schema for a logical plan
>
> test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36)
> test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:172)
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>
> Second call: it creates another for its actual partitions in a physcal plan
>
> test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36)
> test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.newReader(DataSourceV2Relation.scala:61)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.apply(DataSourceV2Strategy.scala:103)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
> scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
> scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
> org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
> org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)
>
>
> Skimming the API doc at DataSourceReader at branch-2.4, I haven’t found
> the guarantee that the readers are created only once. If that’s documented
> somewhere, we should fix it in 2.4.0. If not, I think it fine since both
> calls are in driver side and it’s something able to work around for
> instance static class or thread local in this case.
>
> Forwarding to dev mailing list in case that this is something we haven't
> foreseen.
>
> 2018년 10월 9일 (화) 오후 9:39, Shubham Chaurasia <shubh.chaura...@gmail.com>님이
> 작성:
>
>> Alright, so it is a big project which uses a SQL store underneath.
>> I extracted out the minimal code and made a smaller project out of it and
>> still it is creating multiple instances.
>>
>> Here is my project:
>>
>> ├── my-datasource.iml
>> ├── pom.xml
>> ├── src
>> │   ├── main
>> │   │   ├── java
>> │   │   │   └── com
>> │   │   │       └── shubham
>> │   │   │           ├── MyDataSource.java
>> │   │   │           └── reader
>> │   │   │               └── MyDataSourceReader.java
>>
>>
>> MyDataSource.java
>> -------------------------------------------------
>>
>> package com.shubham;
>>
>> import com.shubham.reader.MyDataSourceReader;
>> import org.apache.spark.sql.SaveMode;
>> import org.apache.spark.sql.sources.v2.DataSourceOptions;
>> import org.apache.spark.sql.sources.v2.DataSourceV2;
>> import org.apache.spark.sql.sources.v2.ReadSupport;
>> import org.apache.spark.sql.sources.v2.WriteSupport;
>> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
>> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
>> import org.apache.spark.sql.types.StructType;
>>
>> import java.util.Optional;
>>
>> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport 
>> {
>>
>>   public DataSourceReader createReader(DataSourceOptions options) {
>>     System.out.println("MyDataSource.createReader: Going to create a new 
>> MyDataSourceReader");
>>     return new MyDataSourceReader(options.asMap());
>>   }
>>
>>   public Optional<DataSourceWriter> createWriter(String writeUUID, 
>> StructType schema, SaveMode mode, DataSourceOptions options) {
>>     return Optional.empty();
>>   }
>> }
>>
>>
>> MyDataSourceReader.java
>> -------------------------------------------------
>>
>> package com.shubham.reader;
>>
>> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
>> import org.apache.spark.sql.sources.v2.reader.InputPartition;
>> import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
>> import org.apache.spark.sql.types.StructType;
>> import org.apache.spark.sql.vectorized.ColumnarBatch;
>>
>> import java.util.ArrayList;
>> import java.util.List;
>> import java.util.Map;
>>
>> public class MyDataSourceReader implements DataSourceReader, 
>> SupportsScanColumnarBatch {
>>
>>   private Map<String, String> options;
>>   private StructType schema;
>>
>>   public MyDataSourceReader(Map<String, String> options) {
>>     System.out.println("MyDataSourceReader.MyDataSourceReader: 
>> Instantiated...." + this);
>>     this.options = options;
>>   }
>>
>>   @Override
>>   public StructType readSchema() {
>>     this.schema = (new StructType())
>>         .add("col1", "int")
>>         .add("col2", "string");
>>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: 
>> " + this.schema);
>>     return this.schema;
>>   }
>>
>>   @Override
>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + 
>> this + " schema: " + this.schema);
>>     return new ArrayList<>();
>>   }
>> }
>>
>>
>> ----------------------------------------
>> spark-shell output
>> ----------------------------------------
>> scala> spark.read.format("com.shubham.MyDataSource").option("query",
>> "select * from some_table").load.show
>>
>> MyDataSource.createReader: Going to create a new MyDataSourceReader
>> MyDataSourceReader.MyDataSourceReader:
>> Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
>> MyDataSourceReader.readSchema:
>> com.shubham.reader.MyDataSourceReader@69fa5536 schema:
>> StructType(StructField(col1,IntegerType,true),
>> StructField(col2,StringType,true))
>> MyDataSource.createReader: Going to create a new MyDataSourceReader
>> MyDataSourceReader.MyDataSourceReader:
>> Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
>> MyDataSourceReader.planBatchInputPartitions:
>> com.shubham.reader.MyDataSourceReader@3095c449 schema: null
>> +----+----+
>> |col1|col2|
>> +----+----+
>> +----+----+
>>
>>
>> Here 2 instances of reader, MyDataSourceReader@69fa5536 and
>> MyDataSourceReader@3095c449 are being created. Consequently schema is
>> null in MyDataSourceReader@3095c449.
>>
>> Am I not doing it the correct way?
>>
>> Thanks,
>> Shubham
>>
>> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <assaf.mendel...@rsa.com>
>> wrote:
>>
>>> I am using v2.4.0-RC2
>>>
>>>
>>>
>>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns
>>> null). How are you calling it?
>>>
>>>
>>>
>>> When I do:
>>>
>>> Val df = spark.read.format(mypackage).load().show()
>>>
>>> I am getting a single creation, how are you creating the reader?
>>>
>>>
>>>
>>> Thanks,
>>>
>>>         Assaf
>>>
>>>
>>>
>>> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
>>> *Sent:* Tuesday, October 9, 2018 2:02 PM
>>> *To:* Mendelson, Assaf; user@spark.apache.org
>>> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
>>> DataSourceReader and hence not preserving the state
>>>
>>>
>>>
>>> [EXTERNAL EMAIL]
>>> Please report any suspicious attachments, links, or requests for
>>> sensitive information.
>>>
>>> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>>>
>>>
>>>
>>> Full Code:
>>>
>>>
>>>
>>> MyDataSource is the entry point which simply creates Reader and Writer
>>>
>>>
>>>
>>> public class MyDataSource implements DataSourceV2, WriteSupport,
>>> ReadSupport, SessionConfigSupport {
>>>
>>>
>>>
>>>   @Override public DataSourceReader createReader(DataSourceOptions
>>> options) {
>>>
>>>     return new MyDataSourceReader(options.asMap());
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public Optional<DataSourceWriter> createWriter(String jobId,
>>> StructType schema,
>>>
>>>       SaveMode mode, DataSourceOptions options) {
>>>
>>>     // creates a dataSourcewriter here..
>>>
>>>     return Optional.of(dataSourcewriter);
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override public String keyPrefix() {
>>>
>>>     return "myprefix";
>>>
>>>   }
>>>
>>>
>>>
>>> }
>>>
>>>
>>>
>>> public class MyDataSourceReader implements DataSourceReader,
>>> SupportsScanColumnarBatch {
>>>
>>>
>>>
>>>   StructType schema = null;
>>>
>>>   Map<String, String> options;
>>>
>>>
>>>
>>>   public MyDataSourceReader(Map<String, String> options) {
>>>
>>>     System.out.println("MyDataSourceReader.MyDataSourceReader:
>>> Instantiated...." + this);
>>>
>>>     this.options = options;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>>
>>>     //variable this.schema is null here since readSchema() was called on
>>> a different instance
>>>
>>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
>>> this + " schema: " + this.schema);
>>>
>>>     //more logic......
>>>
>>>     return null;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public StructType readSchema() {
>>>
>>>     //some logic to discover schema
>>>
>>>     this.schema = (new StructType())
>>>
>>>         .add("col1", "int")
>>>
>>>         .add("col2", "string");
>>>
>>>     System.out.println("MyDataSourceReader.readSchema: " + this + "
>>> schema: " + this.schema);
>>>
>>>     return this.schema;
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Shubham
>>>
>>>
>>>
>>> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <assaf.mendel...@rsa.com>
>>> wrote:
>>>
>>> Could you add a fuller code example? I tried to reproduce it in my
>>> environment and I am getting just one instance of the reader…
>>>
>>>
>>>
>>> Thanks,
>>>
>>>         Assaf
>>>
>>>
>>>
>>> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
>>> *Sent:* Tuesday, October 9, 2018 9:31 AM
>>> *To:* user@spark.apache.org
>>> *Subject:* DataSourceV2 APIs creating multiple instances of
>>> DataSourceReader and hence not preserving the state
>>>
>>>
>>>
>>> [EXTERNAL EMAIL]
>>> Please report any suspicious attachments, links, or requests for
>>> sensitive information.
>>>
>>> Hi All,
>>>
>>>
>>>
>>> --Spark built with *tags/v2.4.0-rc2*
>>>
>>>
>>>
>>> Consider following DataSourceReader implementation:
>>>
>>>
>>>
>>> *public class *MyDataSourceReader *implements *DataSourceReader, 
>>> SupportsScanColumnarBatch {
>>>
>>>   StructType *schema *= *null*;
>>>   Map<String, String> *options*;
>>>
>>>   *public *MyDataSourceReader(Map<String, String> options) {
>>>     System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: 
>>> Instantiated...." *+ *this*);
>>>     *this*.*options *= options;
>>>   }
>>>
>>>   @Override
>>>   *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>>
>>> *//variable this.schema is null here since readSchema() was called on a 
>>> different instance    
>>> *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ 
>>> *this *+ *" schema: " *+ *this*.*schema*);
>>>
>>> *//more logic......    **return null*;
>>>   }
>>>
>>>   @Override
>>>   *public *StructType readSchema() {
>>>
>>> *//some logic to discover schema    **this*.*schema *= (*new *StructType())
>>>         .add(*"col1"*, *"int"*)
>>>         .add(*"col2"*, *"string"*);
>>>     System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" 
>>> schema: " *+ *this*.*schema*);
>>>     *return this*.*schema*;
>>>   }
>>> }
>>>
>>> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets 
>>> class variable schema.
>>>
>>> 2) Now when planBatchInputPartitions() is called, it is being called on a 
>>> different instance of MyDataSourceReader and hence I am not getting the 
>>> value of schema in method planBatchInputPartitions().
>>>
>>>
>>>
>>> How can I get value of schema which was set in readSchema() method, in 
>>> planBatchInputPartitions() method?
>>>
>>>
>>>
>>> Console Logs:
>>>
>>>
>>>
>>> scala> mysource.executeQuery("select * from movie").show
>>>
>>>
>>>
>>> MyDataSourceReader.MyDataSourceReader: 
>>> Instantiated....MyDataSourceReader@59ea8f1b
>>>
>>> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: 
>>> StructType(StructField(col1,IntegerType,true), 
>>> StructField(col2,StringType,true))
>>>
>>> MyDataSourceReader.MyDataSourceReader: 
>>> Instantiated....MyDataSourceReader@a3cd3ff
>>>
>>> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff 
>>> schema: null
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Shubham
>>>
>>>
>>>
>>>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to