viirya commented on code in PR #9080:
URL: https://github.com/apache/arrow-datafusion/pull/9080#discussion_r1478785328


##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1142,12 +1294,49 @@ impl SMJStream {
         let record_batch = concat_batches(&self.schema, 
&self.output_record_batches)?;
         self.join_metrics.output_batches.add(1);
         self.join_metrics.output_rows.add(record_batch.num_rows());
-        self.output_size -= record_batch.num_rows();
+        // If join filter exists, `self.output_size` is not accurate as we 
don't know the exact

Review Comment:
   The logic of `output_size` assumes that each row put into the buffer will 
produce exactly one output row. It is increased when we put rows into buffer 
and decreased after we actually output batches.
   
   So it is used to track the number of rows in buffers. We compare it with 
`self.batch_size` (the target output batch size), and decide to output batches 
from buffers if it reaches.
   
   For joins with join filter cases, the assumption of `output_size` is broken. 
One row put into the buffer may produce more than one output row. For example, 
one joined row under full join doesn't pass join filter, then it will produce 
two output rows, i.e., streamed row joined null row and null row joined 
buffered row.
   
   So the actual output rows `record_batch.num_rows()` may be larger than 
`self.output_size` and `self.output_size -= record_batch.num_rows()` will cause 
overflow.
   
   For such case, we can simply clean up `output_size`.
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to