rtpsw commented on PR #34392:
URL: https://github.com/apache/arrow/pull/34392#issuecomment-1528985676

   I used a macOS host, courtesy of @westonpace, to fairly reliably get a 
failure when under `stress -c`. After debugging and with a few added 
debug-messages, I think I finally figured out the as-of-join issue here.
   
   The pasted output below, taken from a failure repro, shows the following. 
`input 0`, which is the left table of the as-of-join, received 3 batches with 
different content, as expected. However, all these batches have the same 
pointer-address (as seen in `input 0 push` messages), suggesting the next batch 
is allocated at the same address of the previous one that got freed; this is an 
unusual condition, which appears to occur only when certain runtime resources 
are restricted (such as when under stress and in certain CI jobs). The `key 
hasher 0` message shows that the left table's key hasher only hashes once, 
which indicates it is seeing a single batch instead of 3. This is because it 
[invalidates its cached hashes only when the batch address 
changes](https://github.com/apache/arrow/blob/0ea1a103dfc2fc4ab5a5d839369a805e2ae657a2/cpp/src/arrow/acero/asof_join_node.cc#L359-L362),
 which isn't the case here. Consequently, the same key hashes (here, one hash 
for a single-row batch) are com
 puted for the second batch of `input 0` as for its first and third batches; 
this is incorrect given [the left table's 
batches](https://github.com/apache/arrow/pull/34392/files#diff-dd19a151cdbe71c43bad130c0dc2d6d00db89e57b6588d1597ee0011608e3299R980),
 since the second batch has 2 in the `key` column whereas the first and third 
batches have 1 there. This leads to an incorrect output table, which would have 
been correct for a same-key-test-case.
   
   The [recent 
commit](https://github.com/apache/arrow/pull/34392/commits/34134f0b40ea241fa3c686df06c487d0539b9c99)
 fixes the key-hasher to invalidate when a new batch is received on the input 
it is associated with. With this fix, I was able to run 100 repetitions on the 
macOS host under stress without observing any failure. The CI job failures for 
this commit seem to be unrelated.
   
   Note that I still do not have an explanation for (and wasn't able to 
reproduce) [the AMD64 Conda job's 
behavior](https://github.com/apache/arrow/pull/34392#issuecomment-1519077885). 
In particular, the key hasher invalidation issue does not occur there, since 
`key hasher 0` messages show hashes are computed for each batch of the left 
table. However, the above may be satisfying enough to accept this PR.
   
   <details>
   
   <summary>Pasted output:</summary>
   
   AsofjoinNode(0x103e04eb0): received batch from input 2:
   time:   [
       1970-01-01 00:00:00.000000,
       1970-01-01 00:00:00.001500,
       1970-01-01 00:00:00.002500
     ]
   key2:   [
       0,
       0,
       0
     ]
   key:   [
       1,
       1,
       1
     ]
   r1_v0:   [
       100,
       101,
       102
     ]
   
   AsofjoinNode(0x103e04eb0): input 2 push batch=0x60000291ca98
   AsofjoinNode(0x103e04eb0): received batch from input 1:
   time:   [
       1970-01-01 00:00:00.000000,
       1970-01-01 00:00:00.001500,
       1970-01-01 00:00:00.002500
     ]
   key2:   [
       0,
       0,
       0
     ]
   key:   [
       1,
       1,
       1
     ]
   r0_v0:   [
       10,
       11,
       12
     ]
   
   AsofjoinNode(0x103e04eb0): input 1 push batch=0x60000291c7f8
   AsofjoinNode(0x103e04eb0): received batch from input 0:
   time:   [
       1970-01-01 00:00:00.000000
     ]
   key2:   [
       0
     ]
   key:   [
       1
     ]
   l_v0:   [
       1
     ]
   
   AsofjoinNode(0x103e04eb0): input 0 push batch=0x60000290c2b8
   AsofjoinNode(0x103e04eb0): Advancing input 1
   AsofjoinNode(0x103e04eb0): key hasher 1 got hashes [10621589963589458110, 
10621589963589458110, 10621589963589458110]
   AsofjoinNode(0x103e04eb0): memo 1 store: for_time=0 row=0 time=0 
key=10621589963589458110
   AsofjoinNode(0x103e04eb0): Advancing input 1 hit distant time=1500 at=0
   AsofjoinNode(0x103e04eb0): memo 1 remove: ts=0
   AsofjoinNode(0x103e04eb0): Advancing input 1 updated=0
   AsofjoinNode(0x103e04eb0): Advancing input 2
   AsofjoinNode(0x103e04eb0): key hasher 2 got hashes [10621589963589458110, 
10621589963589458110, 10621589963589458110]
   AsofjoinNode(0x103e04eb0): memo 2 store: for_time=0 row=0 time=0 
key=10621589963589458110
   AsofjoinNode(0x103e04eb0): Advancing input 2 hit distant time=1500 at=0
   AsofjoinNode(0x103e04eb0): memo 2 remove: ts=0
   AsofjoinNode(0x103e04eb0): Advancing input 2 updated=0
   AsofjoinNode(0x103e04eb0): key hasher 0 got hashes [10621589963589458110]
   AsofjoinNode(0x103e04eb0): Emplace: key=10621589963589458110 
lhs_latest_row=0 lhs_latest_time=0
   AsofjoinNode(0x103e04eb0):   i=1 has_entry=1 time=0 row=0 accepted=1
   AsofjoinNode(0x103e04eb0):   i=2 has_entry=1 time=0 row=0 accepted=1
   AsofjoinNode(0x103e04eb0): input 0 try-pop batch=0x0
   AsofjoinNode(0x103e04eb0): produce batch 0:
   time:   [
       1970-01-01 00:00:00.000000
     ]
   key2:   [
       0
     ]
   key:   [
       1
     ]
   l_v0:   [
       1
     ]
   r0_v0:   [
       10
     ]
   r1_v0:   [
       100
     ]
   
   AsofjoinNode(0x103e04eb0): received batch from input 0:
   time:   [
       1970-01-01 00:00:00.001000
     ]
   key2:   [
       0
     ]
   key:   [
       2
     ]
   l_v0:   [
       2
     ]
   
   AsofjoinNode(0x103e04eb0): input 0 push batch=0x60000290c2b8
   AsofjoinNode(0x103e04eb0): Advancing input 1
   AsofjoinNode(0x103e04eb0): memo 1 store: for_time=1000 row=1 time=1500 
key=10621589963589458110
   AsofjoinNode(0x103e04eb0): Advancing input 1 hit distant time=2500 at=1000
   AsofjoinNode(0x103e04eb0): memo 1 store: for_time=1000 row=2 time=2500 
key=10621589963589458110
   AsofjoinNode(0x103e04eb0): input 1 try-pop batch=0x0
   AsofjoinNode(0x103e04eb0): memo 1 remove: ts=1000
   AsofjoinNode(0x103e04eb0): Advancing input 1 updated=1
   AsofjoinNode(0x103e04eb0): Advancing input 2
   AsofjoinNode(0x103e04eb0): memo 2 store: for_time=1000 row=1 time=1500 
key=10621589963589458110
   AsofjoinNode(0x103e04eb0): Advancing input 2 hit distant time=2500 at=1000
   AsofjoinNode(0x103e04eb0): memo 2 store: for_time=1000 row=2 time=2500 
key=10621589963589458110
   AsofjoinNode(0x103e04eb0): input 2 try-pop batch=0x0
   AsofjoinNode(0x103e04eb0): memo 2 remove: ts=1000
   AsofjoinNode(0x103e04eb0): Advancing input 2 updated=1
   AsofjoinNode(0x103e04eb0): Emplace: key=10621589963589458110 
lhs_latest_row=0 lhs_latest_time=1000
   AsofjoinNode(0x103e04eb0):   i=1 has_entry=1 time=1500 row=1 accepted=1
   AsofjoinNode(0x103e04eb0):   i=2 has_entry=1 time=1500 row=1 accepted=1
   AsofjoinNode(0x103e04eb0): input 0 try-pop batch=0x0
   AsofjoinNode(0x103e04eb0): produce batch 1:
   time:   [
       1970-01-01 00:00:00.001000
     ]
   key2:   [
       0
     ]
   key:   [
       2
     ]
   l_v0:   [
       2
     ]
   r0_v0:   [
       11
     ]
   r1_v0:   [
       101
     ]
   
   AsofjoinNode(0x103e04eb0): received batch from input 0:
   time:   [
       1970-01-01 00:00:00.002000
     ]
   key2:   [
       0
     ]
   key:   [
       1
     ]
   l_v0:   [
       3
     ]
   
   AsofjoinNode(0x103e04eb0): input 0 push batch=0x60000290c2b8
   AsofjoinNode(0x103e04eb0): Advancing input 1
   AsofjoinNode(0x103e04eb0): memo 1 remove: ts=2000
   AsofjoinNode(0x103e04eb0): Advancing input 2
   AsofjoinNode(0x103e04eb0): memo 2 remove: ts=2000
   AsofjoinNode(0x103e04eb0): Emplace: key=10621589963589458110 
lhs_latest_row=0 lhs_latest_time=2000
   AsofjoinNode(0x103e04eb0):   i=1 has_entry=1 time=2500 row=2 accepted=1
   AsofjoinNode(0x103e04eb0):   i=2 has_entry=1 time=2500 row=2 accepted=1
   AsofjoinNode(0x103e04eb0): input 0 try-pop batch=0x0
   AsofjoinNode(0x103e04eb0): produce batch 2:
   time:   [
       1970-01-01 00:00:00.002000
     ]
   key2:   [
       0
     ]
   key:   [
       1
     ]
   l_v0:   [
       3
     ]
   r0_v0:   [
       12
     ]
   r1_v0:   [
       102
     ]
   
   Comparing flattened expected table:
   time: timestamp[us, tz=UTC]
   key2: int32
   key: int32
   l_v0: int32
   r0_v0: uint64
   r1_v0: int32
   ----
   time:
     [
       [
         1970-01-01 00:00:00.000000,
         1970-01-01 00:00:00.001000,
         1970-01-01 00:00:00.002000
       ]
     ]
   key2:
     [
       [
         0,
         0,
         0
       ]
     ]
   key:
     [
       [
         1,
         2,
         1
       ]
     ]
   l_v0:
     [
       [
         1,
         2,
         3
       ]
     ]
   r0_v0:
     [
       [
         10,
         null,
         12
       ]
     ]
   r1_v0:
     [
       [
         100,
         null,
         102
       ]
     ]
   
   with flattened result table:
   time: timestamp[us, tz=UTC]
   key2: int32
   key: int32
   l_v0: int32
   r0_v0: uint64
   r1_v0: int32
   ----
   time:
     [
       [
         1970-01-01 00:00:00.000000
       ],
       [
         1970-01-01 00:00:00.001000
       ],
       [
         1970-01-01 00:00:00.002000
       ]
     ]
   key2:
     [
       [
         0
       ],
       [
         0
       ],
       [
         0
       ]
     ]
   key:
     [
       [
         1
       ],
       [
         2
       ],
       [
         1
       ]
     ]
   l_v0:
     [
       [
         1
       ],
       [
         2
       ],
       [
         3
       ]
     ]
   r0_v0:
     [
       [
         10
       ],
       [
         11
       ],
       [
         12
       ]
     ]
   r1_v0:
     [
       [
         100
       ],
       [
         101
       ],
       [
         102
       ]
     ]
   ```
   
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to