I believe this issue was fixed in Spark 2.4. Spark DataSource V2 has been still being radically developed - It is not complete yet until now. So, I think the feasible option to get through at the current moment is: 1. upgrade to higher Spark versions 2. disable filter push down at your DataSource V2 implementation
I don't think Spark community will backport or fix things at branch-2.3 which will be EOL release soon. For each branch, DataSource V2 has totally different codes. Fixing those specifically in each branch will bring considerable overhead. I believe that's usually the same case too for some internal Spark forks as well. 2019년 9월 6일 (금) 오후 3:25, Shubham Chaurasia <shubh.chaura...@gmail.com>님이 작성: > Hi, > > I am using spark v2.3.2. I have an implementation of DSV2. Here is what is > happening: > > 1) Obtained a dataframe using MyDataSource > > scala> val df1 = spark.read.format("com.shubham.MyDataSource").load >> MyDataSource.MyDataSource >> MyDataSource.createReader: Going to create a new MyDataSourceReader >> MyDataSourceReader.MyDataSourceReader: >> Instantiated....com.shubham.reader.MyDataSourceReader@2b85edc7 >> MyDataSourceReader.readSchema: >> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: >> StructType(StructField(c1,IntegerType,true), >> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) >> df1: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 1 more field] >> > > 2) show() on df1 > >> scala> df1.show >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pruneColumns: >> StructType(StructField(c1,IntegerType,true), >> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) >> MyDataSourceReader.readSchema: >> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: >> StructType(StructField(c1,IntegerType,true), >> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> =======MyDataSourceReader.createBatchDataReaderFactories======= >> prunedSchema = StructType(StructField(c1,IntegerType,true), >> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) >> pushedFilters = [] >> =======MyDataSourceReader.createBatchDataReaderFactories======= >> +---+---+---+ >> | c1| c2| c3| >> +---+---+---+ >> +---+---+---+ >> > > 3) val df2 = df1.filter($"c3" > 1) > >> >> scala> df2.show >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushedFilters: [] >> MyDataSourceReader.pushFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pruneColumns: >> StructType(StructField(c1,IntegerType,true), >> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) >> MyDataSourceReader.readSchema: >> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: >> StructType(StructField(c1,IntegerType,true), >> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> =======MyDataSourceReader.createBatchDataReaderFactories======= >> prunedSchema = StructType(StructField(c1,IntegerType,true), >> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) >> pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)] >> =======MyDataSourceReader.createBatchDataReaderFactories======= >> +---+---+---+ >> | c1| c2| c3| >> +---+---+---+ >> +---+---+---+ > > > 4) Again df1.show() <=== As df2 is derived from df1(and share same > instance of MyDataSourceReader), this modifies pushedFilters even for df1 > >> scala> df1.show >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pruneColumns: >> StructType(StructField(c1,IntegerType,true), >> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) >> MyDataSourceReader.readSchema: >> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: >> StructType(StructField(c1,IntegerType,true), >> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)] >> =======MyDataSourceReader.createBatchDataReaderFactories======= >> prunedSchema = StructType(StructField(c1,IntegerType,true), >> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true)) >> *pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]* >> =======MyDataSourceReader.createBatchDataReaderFactories======= >> +---+---+---+ >> | c1| c2| c3| >> +---+---+---+ >> +---+---+---+ >> > > *pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)] *is not correct in > step 4) as there were no filters specified for df1. > > This is due to I am maintaining pushedFilters variable in MyDataSourceReader > which is modified by df2.filter().show. > > Questions: > Q1: How to maintain this state in DataSourceReader implementations? > Q2: Shouldn't spark call pushFilters() method every time(regardless of > .filter() is present or not) we invoke some action, in the similar manner > as it calls pruneColumns()? > > I understand that pushFilters() is only invoked when .filter() is there in > dataframe but as we saw in above scenario, it's making the state of > MyDataSourceReader inconsistent and hence the question Q2. > > Minimal Code: > > public class MyDataSource implements DataSourceRegister, DataSourceV2, > ReadSupport, WriteSupport { >> >> public MyDataSource() { >> System.out.println("MyDataSource.MyDataSource"); >> } >> >> @Override >> public DataSourceReader createReader(DataSourceOptions options) { >> System.out.println("MyDataSource.createReader: Going to create a new >> MyDataSourceReader"); >> return new MyDataSourceReader(options.asMap()); >> } >> >> @Override >> public Optional<DataSourceWriter> createWriter(String writeUUID, >> StructType schema, SaveMode mode, DataSourceOptions options) { >> System.out.println("MyDataSource.createWriter: Going to create a new >> MyDataSourceWriter"); >> return Optional.of(new MyDataSourceWriter(schema)); >> } >> >> @Override >> public String shortName() { >> return "com.shubham.MyDataSource"; >> } >> } >> >> > public class MyDataSourceReader implements DataSourceReader, > SupportsPushDownRequiredColumns, SupportsScanColumnarBatch, > SupportsPushDownFilters { >> >> private Map<String, String> options; >> private StructType baseSchema; >> private StructType prunedSchema; >> private Filter[] pushedFilters = new Filter[0]; >> >> public MyDataSourceReader(Map<String, String> options) { >> System.out.println("MyDataSourceReader.MyDataSourceReader: >> Instantiated...." + this); >> this.options = options; >> } >> >> @Override >> public StructType readSchema() { >> this.baseSchema = (new StructType()) >> .add("c1", "int") >> .add("c2", "int") >> .add("c3", "int"); >> System.out.println("MyDataSourceReader.readSchema: " + this + " >> baseSchema: " + this.baseSchema); >> return this.baseSchema; >> } >> >> >> @Override >> public Filter[] pushFilters(Filter[] filters) { >> System.out.println("MyDataSourceReader.pushFilters: " + >> Arrays.toString(filters)); >> // filters that can be pushed down. >> // for this example, let's assume all the filters can be pushed down. >> this.pushedFilters = filters; >> >> // filter's that can't be pushed down. >> return new Filter[0]; >> } >> >> @Override >> public Filter[] pushedFilters() { >> //System.out.println("MyDataSourceReader.pushedFilters: " + >> Arrays.toString(pushedFilters)); >> return this.pushedFilters; >> } >> >> @Override >> public void pruneColumns(StructType requiredSchema) { >> System.out.println("MyDataSourceReader.pruneColumns: " + requiredSchema); >> this.prunedSchema = requiredSchema; >> } >> >> @Override >> public List<DataReaderFactory<ColumnarBatch>> >> createBatchDataReaderFactories() { >> >> System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories======="); >> // do the actual operation with baseSchema, prunedSchema, pushedFilters >> >> System.out.println("prunedSchema = " + prunedSchema); >> System.out.println("pushedFilters = " + Arrays.toString(pushedFilters)); >> >> >> System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories======="); >> >> return new ArrayList<>(); >> } >> >> } >> >> > Thanks, > Shubham > > > >