I believe this issue was fixed in Spark 2.4.

Spark DataSource V2 has been still being radically developed - It is not
complete yet until now.
So, I think the feasible option to get through at the current moment is:
  1. upgrade to higher Spark versions
  2. disable filter push down at your DataSource V2 implementation

I don't think Spark community will backport or fix things at branch-2.3
which will be EOL release soon.
For each branch, DataSource V2 has totally different codes.
Fixing those specifically in each branch will bring considerable overhead.
I believe that's usually the same case too for some internal Spark forks as
well.



2019년 9월 6일 (금) 오후 3:25, Shubham Chaurasia <shubh.chaura...@gmail.com>님이 작성:

> Hi,
>
> I am using spark v2.3.2. I have an implementation of DSV2. Here is what is
> happening:
>
> 1) Obtained a dataframe using MyDataSource
>
> scala> val df1 = spark.read.format("com.shubham.MyDataSource").load
>> MyDataSource.MyDataSource
>> MyDataSource.createReader: Going to create a new MyDataSourceReader
>> MyDataSourceReader.MyDataSourceReader:
>> Instantiated....com.shubham.reader.MyDataSourceReader@2b85edc7
>> MyDataSourceReader.readSchema:
>> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
>> StructType(StructField(c1,IntegerType,true),
>> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
>> df1: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 1 more field]
>>
>
> 2) show() on df1
>
>> scala> df1.show
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pruneColumns:
>> StructType(StructField(c1,IntegerType,true),
>> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
>> MyDataSourceReader.readSchema:
>> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
>> StructType(StructField(c1,IntegerType,true),
>> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> =======MyDataSourceReader.createBatchDataReaderFactories=======
>> prunedSchema = StructType(StructField(c1,IntegerType,true),
>> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
>> pushedFilters = []
>> =======MyDataSourceReader.createBatchDataReaderFactories=======
>> +---+---+---+
>> | c1| c2| c3|
>> +---+---+---+
>> +---+---+---+
>>
>
> 3) val df2 = df1.filter($"c3" > 1)
>
>>
>> scala> df2.show
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushedFilters: []
>> MyDataSourceReader.pushFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pruneColumns:
>> StructType(StructField(c1,IntegerType,true),
>> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
>> MyDataSourceReader.readSchema:
>> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
>> StructType(StructField(c1,IntegerType,true),
>> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> =======MyDataSourceReader.createBatchDataReaderFactories=======
>> prunedSchema = StructType(StructField(c1,IntegerType,true),
>> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
>> pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]
>> =======MyDataSourceReader.createBatchDataReaderFactories=======
>> +---+---+---+
>> | c1| c2| c3|
>> +---+---+---+
>> +---+---+---+
>
>
> 4) Again df1.show() <=== As df2 is derived from df1(and share same
> instance of MyDataSourceReader), this modifies pushedFilters even for df1
>
>> scala> df1.show
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pruneColumns:
>> StructType(StructField(c1,IntegerType,true),
>> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
>> MyDataSourceReader.readSchema:
>> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
>> StructType(StructField(c1,IntegerType,true),
>> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
>> =======MyDataSourceReader.createBatchDataReaderFactories=======
>> prunedSchema = StructType(StructField(c1,IntegerType,true),
>> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
>> *pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]*
>> =======MyDataSourceReader.createBatchDataReaderFactories=======
>> +---+---+---+
>> | c1| c2| c3|
>> +---+---+---+
>> +---+---+---+
>>
>
> *pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)] *is not correct in
> step 4) as there were no filters specified for df1.
>
> This is due to I am maintaining pushedFilters variable in MyDataSourceReader
> which is modified by df2.filter().show.
>
> Questions:
> Q1: How to maintain this state in DataSourceReader implementations?
> Q2: Shouldn't spark call pushFilters() method every time(regardless of
> .filter() is present or not) we invoke some action, in the similar manner
> as it calls pruneColumns()?
>
> I understand that pushFilters() is only invoked when .filter() is there in
> dataframe but as we saw in above scenario, it's making the state of
> MyDataSourceReader inconsistent and hence the question Q2.
>
> Minimal Code:
>
> public class MyDataSource implements DataSourceRegister, DataSourceV2, 
> ReadSupport, WriteSupport {
>>
>>   public MyDataSource() {
>>     System.out.println("MyDataSource.MyDataSource");
>>   }
>>
>>   @Override
>>   public DataSourceReader createReader(DataSourceOptions options) {
>>     System.out.println("MyDataSource.createReader: Going to create a new 
>> MyDataSourceReader");
>>     return new MyDataSourceReader(options.asMap());
>>   }
>>
>>   @Override
>>   public Optional<DataSourceWriter> createWriter(String writeUUID, 
>> StructType schema, SaveMode mode, DataSourceOptions options) {
>>     System.out.println("MyDataSource.createWriter: Going to create a new 
>> MyDataSourceWriter");
>>     return Optional.of(new MyDataSourceWriter(schema));
>>   }
>>
>>   @Override
>>   public String shortName() {
>>     return "com.shubham.MyDataSource";
>>   }
>> }
>>
>>
> public class MyDataSourceReader implements DataSourceReader, 
> SupportsPushDownRequiredColumns, SupportsScanColumnarBatch, 
> SupportsPushDownFilters {
>>
>>   private Map<String, String> options;
>>   private StructType baseSchema;
>>   private StructType prunedSchema;
>>   private Filter[] pushedFilters = new Filter[0];
>>
>>   public MyDataSourceReader(Map<String, String> options) {
>>     System.out.println("MyDataSourceReader.MyDataSourceReader: 
>> Instantiated...." + this);
>>     this.options = options;
>>   }
>>
>>   @Override
>>   public StructType readSchema() {
>>     this.baseSchema = (new StructType())
>>         .add("c1", "int")
>>         .add("c2", "int")
>>         .add("c3", "int");
>>     System.out.println("MyDataSourceReader.readSchema: " + this + " 
>> baseSchema: " + this.baseSchema);
>>     return this.baseSchema;
>>   }
>>
>>
>>   @Override
>>   public Filter[] pushFilters(Filter[] filters) {
>>     System.out.println("MyDataSourceReader.pushFilters: " + 
>> Arrays.toString(filters));
>>     // filters that can be pushed down.
>>     // for this example, let's assume all the filters can be pushed down.
>>     this.pushedFilters = filters;
>>
>>     // filter's that can't be pushed down.
>>     return new Filter[0];
>>   }
>>
>>   @Override
>>   public Filter[] pushedFilters() {
>>     //System.out.println("MyDataSourceReader.pushedFilters: " + 
>> Arrays.toString(pushedFilters));
>>     return this.pushedFilters;
>>   }
>>
>>   @Override
>>   public void pruneColumns(StructType requiredSchema) {
>>     System.out.println("MyDataSourceReader.pruneColumns: " + requiredSchema);
>>     this.prunedSchema = requiredSchema;
>>   }
>>
>>   @Override
>>   public List<DataReaderFactory<ColumnarBatch>> 
>> createBatchDataReaderFactories() {
>>     
>> System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");
>>     // do the actual operation with baseSchema, prunedSchema, pushedFilters
>>
>>     System.out.println("prunedSchema = " + prunedSchema);
>>     System.out.println("pushedFilters = " + Arrays.toString(pushedFilters));
>>
>>     
>> System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");
>>
>>     return new ArrayList<>();
>>   }
>>
>> }
>>
>>
> Thanks,
> Shubham
>
>
>
>

Reply via email to