comphead commented on issue #11555:
URL: https://github.com/apache/datafusion/issues/11555#issuecomment-2307863276

   I think this local test may cover lots of cases 
   
   ```
   #[tokio::test]
   async fn test_cross_1() {
       let left: Vec<RecordBatch> = make_staggered_batches(1);
   
       let left = vec![
           RecordBatch::try_new(
               left[0].schema().clone(),
               vec![
                   Arc::new(Int32Array::from(vec![0, 0, 0, 0, 0, 0, 0])),
                   Arc::new(Int32Array::from(vec![10, 11, 15, 20, 30, 30, 30])),
                   Arc::new(Int32Array::from(vec![110, 111, 115, 120, 129, 130, 
131])),
                   Arc::new(Int32Array::from(vec![1100, 1110, 1150, 1200, 1290, 
1300, 1310])),
               ],
           ).unwrap(),
           RecordBatch::try_new(
               left[0].schema().clone(),
               vec![
                   Arc::new(Int32Array::from(vec![0, 0, 0, 0])),
                   Arc::new(Int32Array::from(vec![30, 40, 50, 70])),
                   Arc::new(Int32Array::from(vec![132, 140, 150, 170])),
                   Arc::new(Int32Array::from(vec![1320, 1400, 1500, 1700])),
               ],
           ).unwrap()
       ];
   
       let right = vec![
           RecordBatch::try_new(
               left[0].schema().clone(),
               vec![
                   Arc::new(Int32Array::from(vec![0])),
                   Arc::new(Int32Array::from(vec![10])),
                   Arc::new(Int32Array::from(vec![1100])),
                   Arc::new(Int32Array::from(vec![11011])),
               ],
           ).unwrap(),
           RecordBatch::try_new(
               left[0].schema().clone(),
               vec![
                   Arc::new(Int32Array::from(vec![0])),
                   Arc::new(Int32Array::from(vec![20])),
                   Arc::new(Int32Array::from(vec![2100])),
                   Arc::new(Int32Array::from(vec![21011])),
               ],
           ).unwrap(),
           RecordBatch::try_new(
               left[0].schema().clone(),
               vec![
                   Arc::new(Int32Array::from(vec![0, 0, 0])),
                   Arc::new(Int32Array::from(vec![30, 30, 30])),
                   Arc::new(Int32Array::from(vec![3100, 3101, 3102])),
                   Arc::new(Int32Array::from(vec![31001, 31011, 31021])),
               ],
           ).unwrap(),
           RecordBatch::try_new(
               left[0].schema().clone(),
               vec![
                   Arc::new(Int32Array::from(vec![0, 0, 0, 0])),
                   Arc::new(Int32Array::from(vec![30, 30, 30, 40])),
                   Arc::new(Int32Array::from(vec![3110, 3111, 3112, 4099])),
                   Arc::new(Int32Array::from(vec![31101, 31111, 31121, 40991])),
               ],
           ).unwrap(),
           RecordBatch::try_new(
               left[0].schema().clone(),
               vec![
                   Arc::new(Int32Array::from(vec![0, 0])),
                   Arc::new(Int32Array::from(vec![40, 49])),
                   Arc::new(Int32Array::from(vec![4100, 6100])),
                   Arc::new(Int32Array::from(vec![41011, 61011])),
               ],
           ).unwrap(),
       ];
   
       JoinFuzzTestCase::new(
           left,
           right,
           JoinType::LeftAnti,
           Some(Box::new(col_lt_col_filter)),
       )
           .run_test(&[JoinTestType::HjSmj], false)
           .await;
   }
   ```
   
   My initial thought was to do like :
   - Get all matched batches for particular streaming row
   - Increase counter when `freeze_streamed` called until we hit matched 
batches length, so presumably processed all the data.
   
   But this approach has a flaw, namely
   for given streamed row like in test above: for (0, 30) key there are 2 
matches batches, but the way `freeze_streamed` called is not determenistic. 
   
   So for (0, 30) there are 2 batches, 3 matched rows each.
   Sometimes thery are processed in 6 calls, 1 buffered row per call. sometimes 
1 call for 3 rows, and then 3 calls per 1 buffered row.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to