westonpace commented on PR #43632:
URL: https://github.com/apache/arrow/pull/43632#issuecomment-2284669419

   > @westonpace your last example confuses me a bit, particularly because the 
entire purpose of this is to create a push-based approach for async handling, 
rather than a pull-based approach.
   
   I read the purpose as "create an async interface".  There are both 
push-based and pull-based asynchronous interfaces.  Is there a reason you 
specifically want a pull-based interface?
   
   > I don't understand signalling to the producer that data is available, the 
producer knows when data is available right? it's producing the data in the 
first place. Isn't the point that the producer needs to signal the consumer 
that more data is available? Waker would need to be consumer created, with the 
producer calling wake when data is available. Was your comment just a typo, or 
am I missing something?
   
   Typo.  The waker is for the producer to signal the consumer that data is 
available.
   
   > Also in the case of this being able to handle concurrency, the 
get_last_error function becomes mostly meaningless, right? Depending on when it 
is called, you end up with a race condition as far as what the error might be.
   
   Good point.  The error handling would have to move to the task.
   
   > All in all, i'm not completely sold on the idea of this re-inversion of 
the paradigm. It honestly feels sorta hacky, but that might just be me.
   
   I'm basically modeling this on the current lance file reader.  The file 
reader knows (from the metadata) how many tasks there will be.  `get_next_task` 
is a synchronous method that creates a task which is an asynchronous task.  
Therefore:
   
   > 
   >    There is only one record in the stream, it is delayed
   >    Consumer calls get_next_task 5 times in a loop to get the next 5 tasks
   
   This would never happen.  The first call to `get_next_task` returns the 
first task.  The second call returns null because it knows all tasks are done.  
However, if you didn't know ahead of time how many tasks there are then tasks 
2-5 would just all return NULL.
   
   > The original idea and semantics here were that the 
ArrowAsyncDeviceArrayStream is constructed by the consumer and given to the 
producer, given the comments you've put here it seems like this is the other 
way around now? The producer still provides the struct, and the consumer calls 
get_next providing the waker. Do I have that right? What happens if get_next is 
called again before wake is called on the waker? Are we defining that get_next 
must not be called until wake is called? The semantics are a bit confusing to 
me here as far as what you're proposing.
   
   Yeah, sorry for the typo, but you got it.  The producer creates the 
`ArrowAsyncDeviceArrayStream`.  The consumer creates the `Waker` and passes it 
into the call to `get_next`.
   
   >  What happens if get_next is called again before wake is called on the 
waker?
   
   There are ways this could work but I don't see any advantage in allowing it 
so it'd be easier to say that must not happen.
   
   > Are we defining that get_next must not be called until wake is called?
   
   Sure, if this simplifies things there is no harm in defining that.
   
   > The semantics are a bit confusing to me here as far as what you're 
proposing.
   
   This is based on Rust's asynchronous model which I've found to work well.  
The consumer in this case is running some kind of event-loop based coroutine 
asynchronous logic.
   
   ```
   while True:
     for task in ready_tasks:
       task.run()
   ```
   
   When calling an asynchronous method the logic looks something like this 
(taking many liberties here)
   
   ```
   cur_task = get_current_task()
   waker = lambda: tasks.make_task_active(cur_task.id)
   res = call_async_method(..., waker=waker)
   if res == EWOULDBLOCK:
     return yield_current_task() # makes the task inactive
   ```
   
   ## Resetting the conversation
   
   My goals here are currently based on the approach I'm taking with the lance 
file reader where I go pretty far out of my way to avoid data being transferred 
from one core to another (e.g. from one L1/L2 cache to another).  It may be 
that the complexity of achieving these goals is not worth the benefit.  
Currently the approach I'm using is the task based approach described above:
   
    * Consumer calls `get_next_task` (synchronous, single-threaded)
      * Producer figures out what I/O is needed to fulfill the task
      * Producer schedules the I/O
      * Producer creates a task to decode the data and returns it
    * Consumer checks if task is ready, if not, it adds it to the inactive task 
list.
    * I/O finishes on producer.  Producer marks task ready.
    * Once consumer has a free thread it assigns it the ready task.  Consumer 
runs task, decodes data (the decode logic is producer logic), and then 
processes the decoded data (consumer logic) all without a CPU core transfer 
(data stays in L2 cache).
   
   So my main goal here is to have some way so that the handoff from producer 
to consumer can happen without forcing a CPU core transfer.
   
   > As described, the intent was that a consumer doesn't need to handle 
concurrency by default because a producer should only call the on_next when the 
previous one returns. But in the scenario you describe, where the processing 
inside of on_next might be lengthy, I think it's fine for the consumer to have 
to make the decision as to whether or not it should create a new thread task 
(if it wants to handle things in parallel) or do it inside of on_next (and 
potentially tie up the producer, but that's basically just backpressure 
essentially).
   
   This doesn't meet my goal because, by this point, the producer has already 
done the decoding.  If the consumer decides to make a new thread task it is 
incurring the cost of a CPU core transfer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to