comphead commented on PR #12082: URL: https://github.com/apache/datafusion/pull/12082#issuecomment-2319492383
> > If I have a left table > > a b > > 10 20 > > and right table > > a b > > 5 20 > > 10 20 > > 10 21 > > 10 21 > > 10 22 > > 15 22 > > And join key is A and Filter is on column B > > In `freeze_streamed` I can observe the right table comes as 3 batches > > 1 Batch. join_array [10] Range 1..3 - which is correct as rownumbers 1 and 2 related to join key 10 2 Batch. join_array[10] Range 0..2 - which is correct as rownumbers 0 and 1 related to join key 10 3 Batch. join_array[15] Range 0..1 - which is weird, why this batch associated ? > > Would you let me know how do you cut the 3 batches among the 6 buffered rows? I believe it depends on batch_size, output_size. What I have observed the buffered batch of 6 rows can be processed differently. 3 + 1 + 1 + 1, or 1 + 1 + 1 + 1 + 1 + 1, or 1 batch of 6 rows. I think @korowa mentioned it here > filtered anti join should return only the records for which buffered-side scanning is completed (as freeze_streamed may be called in the middle of buffered-data scanning, due to output batch size), and there were no true filters for them (from p.1) -- so, maybe we should split filter evaluation and output emission in freeze_streamed (since the filters should be checked for all matched indices, but in the same time, the current streamed index can be filitered out of output because it has further buffered batches to be joined with)? For the simplicity lets consider the test in https://github.com/apache/datafusion/pull/12082#issuecomment-2319361185 When I debug the freeze_streamed I can see the buffered data is coming as ``` [datafusion/physical-plan/src/joins/sort_merge_join.rs:1500:25] &self.buffered_data.batches = [ BufferedBatch { batch: Some( RecordBatch { columns: [ PrimitiveArray<Int32> [ 0, 1, 1, 2, ], PrimitiveArray<Int32> [ 0, 10, 11, 20, ], PrimitiveArray<Int32> [ 0, 1100, 0, 2100, ], PrimitiveArray<Int32> [ 0, 11000, 0, 21000, ], ], row_count: 4, }, ), range: 3..4, join_arrays: [ PrimitiveArray<Int32> [ 0, 1, 1, 2, ], PrimitiveArray<Int32> [ 0, 10, 11, 20, ], ], }, BufferedBatch { batch: Some( RecordBatch { columns: [ PrimitiveArray<Int32> [ 2, 2, ], PrimitiveArray<Int32> [ 20, 21, ], PrimitiveArray<Int32> [ 2101, 0, ], PrimitiveArray<Int32> [ 21001, 0, ], ], row_count: 2, }, ), range: 0..1, join_arrays: [ PrimitiveArray<Int32> [ 2, 2, ], PrimitiveArray<Int32> [ 20, 21, ], ], }, ] ``` What are ranges here? the doc says ``` /// The range in which the rows share the same join key pub range: Range<usize>, ``` but how `range: 3..4` in first batch and `range: 0..1,` in second matches the join key at all? it points to non matched rows -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org