viirya commented on code in PR #9080:
URL: https://github.com/apache/arrow-datafusion/pull/9080#discussion_r1478785328
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1142,12 +1294,49 @@ impl SMJStream {
let record_batch = concat_batches(&self.schema,
&self.output_record_batches)?;
self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(record_batch.num_rows());
- self.output_size -= record_batch.num_rows();
+ // If join filter exists, `self.output_size` is not accurate as we
don't know the exact
Review Comment:
The logic of `output_size` assumes that each row put into the buffer will
produce exactly one output row. It is increased when we put rows into buffer
and decreased after we actually output batches.
So it is used to track the number of rows in buffers. We compare it with
`self.batch_size` (the target output batch size), and decide to output batches
from buffers if it reaches.
For joins with join filter cases, the assumption of `output_size` is broken.
One row put into the buffer may produce more than one output row. For example,
one joined row under full join doesn't pass join filter, then it will produce
two output rows, i.e., streamed row joined null row and null row joined
buffered row.
So the actual output rows `record_batch.num_rows()` may be larger than
`self.output_size` and `self.output_size -= record_batch.num_rows()` will cause
overflow.
For such case, we can simply clean up `output_size`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]