rtpsw commented on PR #34392: URL: https://github.com/apache/arrow/pull/34392#issuecomment-1528985676
I used a macOS host, courtesy of @westonpace, to fairly reliably get a failure when under `stress -c`. After debugging and with a few added debug-messages, I think I finally figured out the as-of-join issue here. The pasted output below, taken from a failure repro, shows the following. `input 0`, which is the left table of the as-of-join, received 3 batches with different content, as expected. However, all these batches have the same pointer-address (as seen in `input 0 push` messages), suggesting the next batch is allocated at the same address of the previous one that got freed; this is an unusual condition, which appears to occur only when certain runtime resources are restricted (such as when under stress and in certain CI jobs). The `key hasher 0` message shows that the left table's key hasher only hashes once, which indicates it is seeing a single batch instead of 3. This is because it [invalidates its cached hashes only when the batch address changes](https://github.com/apache/arrow/blob/0ea1a103dfc2fc4ab5a5d839369a805e2ae657a2/cpp/src/arrow/acero/asof_join_node.cc#L359-L362), which isn't the case here. Consequently, the same key hashes (here, one hash for a single-row batch) are com puted for the second batch of `input 0` as for its first and third batches; this is incorrect given [the left table's batches](https://github.com/apache/arrow/pull/34392/files#diff-dd19a151cdbe71c43bad130c0dc2d6d00db89e57b6588d1597ee0011608e3299R980), since the second batch has 2 in the `key` column whereas the first and third batches have 1 there. This leads to an incorrect output table, which would have been correct for a same-key-test-case. The [recent commit](https://github.com/apache/arrow/pull/34392/commits/34134f0b40ea241fa3c686df06c487d0539b9c99) fixes the key-hasher to invalidate when a new batch is received on the input it is associated with. With this fix, I was able to run 100 repetitions on the macOS host under stress without observing any failure. The CI job failures for this commit seem to be unrelated. Note that I still do not have an explanation for (and wasn't able to reproduce) [the AMD64 Conda job's behavior](https://github.com/apache/arrow/pull/34392#issuecomment-1519077885). In particular, the key hasher invalidation issue does not occur there, since `key hasher 0` messages show hashes are computed for each batch of the left table. However, the above may be satisfying enough to accept this PR. <details> <summary>Pasted output:</summary> AsofjoinNode(0x103e04eb0): received batch from input 2: time: [ 1970-01-01 00:00:00.000000, 1970-01-01 00:00:00.001500, 1970-01-01 00:00:00.002500 ] key2: [ 0, 0, 0 ] key: [ 1, 1, 1 ] r1_v0: [ 100, 101, 102 ] AsofjoinNode(0x103e04eb0): input 2 push batch=0x60000291ca98 AsofjoinNode(0x103e04eb0): received batch from input 1: time: [ 1970-01-01 00:00:00.000000, 1970-01-01 00:00:00.001500, 1970-01-01 00:00:00.002500 ] key2: [ 0, 0, 0 ] key: [ 1, 1, 1 ] r0_v0: [ 10, 11, 12 ] AsofjoinNode(0x103e04eb0): input 1 push batch=0x60000291c7f8 AsofjoinNode(0x103e04eb0): received batch from input 0: time: [ 1970-01-01 00:00:00.000000 ] key2: [ 0 ] key: [ 1 ] l_v0: [ 1 ] AsofjoinNode(0x103e04eb0): input 0 push batch=0x60000290c2b8 AsofjoinNode(0x103e04eb0): Advancing input 1 AsofjoinNode(0x103e04eb0): key hasher 1 got hashes [10621589963589458110, 10621589963589458110, 10621589963589458110] AsofjoinNode(0x103e04eb0): memo 1 store: for_time=0 row=0 time=0 key=10621589963589458110 AsofjoinNode(0x103e04eb0): Advancing input 1 hit distant time=1500 at=0 AsofjoinNode(0x103e04eb0): memo 1 remove: ts=0 AsofjoinNode(0x103e04eb0): Advancing input 1 updated=0 AsofjoinNode(0x103e04eb0): Advancing input 2 AsofjoinNode(0x103e04eb0): key hasher 2 got hashes [10621589963589458110, 10621589963589458110, 10621589963589458110] AsofjoinNode(0x103e04eb0): memo 2 store: for_time=0 row=0 time=0 key=10621589963589458110 AsofjoinNode(0x103e04eb0): Advancing input 2 hit distant time=1500 at=0 AsofjoinNode(0x103e04eb0): memo 2 remove: ts=0 AsofjoinNode(0x103e04eb0): Advancing input 2 updated=0 AsofjoinNode(0x103e04eb0): key hasher 0 got hashes [10621589963589458110] AsofjoinNode(0x103e04eb0): Emplace: key=10621589963589458110 lhs_latest_row=0 lhs_latest_time=0 AsofjoinNode(0x103e04eb0): i=1 has_entry=1 time=0 row=0 accepted=1 AsofjoinNode(0x103e04eb0): i=2 has_entry=1 time=0 row=0 accepted=1 AsofjoinNode(0x103e04eb0): input 0 try-pop batch=0x0 AsofjoinNode(0x103e04eb0): produce batch 0: time: [ 1970-01-01 00:00:00.000000 ] key2: [ 0 ] key: [ 1 ] l_v0: [ 1 ] r0_v0: [ 10 ] r1_v0: [ 100 ] AsofjoinNode(0x103e04eb0): received batch from input 0: time: [ 1970-01-01 00:00:00.001000 ] key2: [ 0 ] key: [ 2 ] l_v0: [ 2 ] AsofjoinNode(0x103e04eb0): input 0 push batch=0x60000290c2b8 AsofjoinNode(0x103e04eb0): Advancing input 1 AsofjoinNode(0x103e04eb0): memo 1 store: for_time=1000 row=1 time=1500 key=10621589963589458110 AsofjoinNode(0x103e04eb0): Advancing input 1 hit distant time=2500 at=1000 AsofjoinNode(0x103e04eb0): memo 1 store: for_time=1000 row=2 time=2500 key=10621589963589458110 AsofjoinNode(0x103e04eb0): input 1 try-pop batch=0x0 AsofjoinNode(0x103e04eb0): memo 1 remove: ts=1000 AsofjoinNode(0x103e04eb0): Advancing input 1 updated=1 AsofjoinNode(0x103e04eb0): Advancing input 2 AsofjoinNode(0x103e04eb0): memo 2 store: for_time=1000 row=1 time=1500 key=10621589963589458110 AsofjoinNode(0x103e04eb0): Advancing input 2 hit distant time=2500 at=1000 AsofjoinNode(0x103e04eb0): memo 2 store: for_time=1000 row=2 time=2500 key=10621589963589458110 AsofjoinNode(0x103e04eb0): input 2 try-pop batch=0x0 AsofjoinNode(0x103e04eb0): memo 2 remove: ts=1000 AsofjoinNode(0x103e04eb0): Advancing input 2 updated=1 AsofjoinNode(0x103e04eb0): Emplace: key=10621589963589458110 lhs_latest_row=0 lhs_latest_time=1000 AsofjoinNode(0x103e04eb0): i=1 has_entry=1 time=1500 row=1 accepted=1 AsofjoinNode(0x103e04eb0): i=2 has_entry=1 time=1500 row=1 accepted=1 AsofjoinNode(0x103e04eb0): input 0 try-pop batch=0x0 AsofjoinNode(0x103e04eb0): produce batch 1: time: [ 1970-01-01 00:00:00.001000 ] key2: [ 0 ] key: [ 2 ] l_v0: [ 2 ] r0_v0: [ 11 ] r1_v0: [ 101 ] AsofjoinNode(0x103e04eb0): received batch from input 0: time: [ 1970-01-01 00:00:00.002000 ] key2: [ 0 ] key: [ 1 ] l_v0: [ 3 ] AsofjoinNode(0x103e04eb0): input 0 push batch=0x60000290c2b8 AsofjoinNode(0x103e04eb0): Advancing input 1 AsofjoinNode(0x103e04eb0): memo 1 remove: ts=2000 AsofjoinNode(0x103e04eb0): Advancing input 2 AsofjoinNode(0x103e04eb0): memo 2 remove: ts=2000 AsofjoinNode(0x103e04eb0): Emplace: key=10621589963589458110 lhs_latest_row=0 lhs_latest_time=2000 AsofjoinNode(0x103e04eb0): i=1 has_entry=1 time=2500 row=2 accepted=1 AsofjoinNode(0x103e04eb0): i=2 has_entry=1 time=2500 row=2 accepted=1 AsofjoinNode(0x103e04eb0): input 0 try-pop batch=0x0 AsofjoinNode(0x103e04eb0): produce batch 2: time: [ 1970-01-01 00:00:00.002000 ] key2: [ 0 ] key: [ 1 ] l_v0: [ 3 ] r0_v0: [ 12 ] r1_v0: [ 102 ] Comparing flattened expected table: time: timestamp[us, tz=UTC] key2: int32 key: int32 l_v0: int32 r0_v0: uint64 r1_v0: int32 ---- time: [ [ 1970-01-01 00:00:00.000000, 1970-01-01 00:00:00.001000, 1970-01-01 00:00:00.002000 ] ] key2: [ [ 0, 0, 0 ] ] key: [ [ 1, 2, 1 ] ] l_v0: [ [ 1, 2, 3 ] ] r0_v0: [ [ 10, null, 12 ] ] r1_v0: [ [ 100, null, 102 ] ] with flattened result table: time: timestamp[us, tz=UTC] key2: int32 key: int32 l_v0: int32 r0_v0: uint64 r1_v0: int32 ---- time: [ [ 1970-01-01 00:00:00.000000 ], [ 1970-01-01 00:00:00.001000 ], [ 1970-01-01 00:00:00.002000 ] ] key2: [ [ 0 ], [ 0 ], [ 0 ] ] key: [ [ 1 ], [ 2 ], [ 1 ] ] l_v0: [ [ 1 ], [ 2 ], [ 3 ] ] r0_v0: [ [ 10 ], [ 11 ], [ 12 ] ] r1_v0: [ [ 100 ], [ 101 ], [ 102 ] ] ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
