icexelloss opened a new pull request, #13028:
URL: https://github.com/apache/arrow/pull/13028
## Overview
This is a work in progress implementation of the AsofJoin node in Arrow C++
compute. The code needs quite a bit of clean up but I have worked on this long
enough that I think I benefit from some inputs/comments from Arrow maintainers
about the high levels.
All Credit to @stmrtn (Steven Martin) who is the original author of the code.
## Implementation
There are quite a bit of code and here is how it works at the high level:
Classes:
* `InputState`: A class that handles queuing for input batches and purging
unneeded batches. There are one input state per input table.
* `MemoStore`: A class that responsible for advancing row index and getting
the latest row for each key for each key given a timestamp. (Latest timestamp
that is <= the given timestamp)
* `CompositeReferenceTable`: A class that is responsible for storing
temporary output rows and produces RecordBatches from those rows.
Algorithm:
* The node takes one left side table and n right side tables, and produces a
joined table
* It is currently assumed that each input table will call `InputReceived`
with time-ordered batches. `InputReceived` will queue the batches inside
`InputState` (it doesn't do any work). There is a separate process thread that
wakes up when there is new inputs and attempts to produces a output batch. If
the current data is not enough to produce the output batch (i.e., we have not
received all the potential right side rows that could be a match for the
current left batch), it will wait for new inputs.
* The process thread works as follows:
1. Advance left row index for the current batch. Then advance right tables
to get the latest right row (i.e., latest right row with timestamp <= left row
timestamp)
2. Once advances are done, it will continue to check to produce the output
row for the current left row
3. Go to 1 until left batches are processed
4. Output batch for the current left batch
5. Purge batches that are no longer needed
6. Wait until enough batches are received to process the next left batch
Entry point for the algorithm is `process()`
## TODO
- [ ] More Tests
- [ ] Decide if we can replace `CompositeReferenceTable` with sth that
already exits (perhaps `RowEncoder`?)
- [ ] Handle more datatypes (both key and value)
- [ ] Handle multiple keys
- [ ] Life cycle management for the process thread (or whether or not we
should have it)
- [ ] Lint & Code Style
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]