+1 for this.

Actually, this is a headache for Flink SQL too.

There is certainly a lot of updated data (CDC changelog) in real
stream processing, The semantics here is the need to ensure the order
between keys, and different keys can be handled in disorder.

I'm very happy that the community has a similar need, and I think it's
worth refining it in Flink.

Best,
Jingsong

On Tue, Jun 20, 2023 at 10:20 PM Juho Autio
<juho.au...@rovio.com.invalid> wrote:
>
> Thank you very much! It seems like you have a quite similar goal. However,
> could you clarify: do you maintain the stream order on key level, or do you
> just limit the parallel requests per key to one without caring about the
> order?
>
> I'm not 100% sure how your implementation with futures is done. If you are
> able to share a code snippet that would be much appreciated!
>
> I'm also wondering what kind of memory implication that implementation has:
> would the futures be queued inside the operator without any limit? Would it
> be a problem if the same key has too many records within the same time
> window? But I suppose the function can be made blocking to protect against
> that.
>
> On Tue, Jun 20, 2023 at 3:34 PM Galen Warren
> <ga...@cvillewarrens.com.invalid> wrote:
>
> > Hi Juho -- I'm doing something similar. In my case, I want to execute async
> > requests concurrently for inputs associated with different keys but issue
> > them sequentially for any given key. The way I do it is to create a keyed
> > stream and use it as an input to an async function. In this arrangement,
> > all the inputs for a given key are handled by a single instance of the
> > async function; inside that function instance, I use a map to keep track of
> > any in-flight requests for a given key. When a new input comes in for a
> > key, if there is an existing in-flight request for that key, the future
> > that is constructed for the new request is constructed as [existing
> > request].then([new request]) so that the new one is only executed once the
> > in-flight request completes. The futures are constructed in such a way that
> > they maintain the map properly after completing.
> >
> >
> > On Mon, Jun 19, 2023 at 10:55 AM Juho Autio <juho.au...@rovio.com.invalid>
> > wrote:
> >
> > > I need to make some slower external requests in parallel, so Async I/O
> > > helps greatly with that. However, I also need to make the requests in a
> > > certain order per key. Is that possible with Async I/O?
> > >
> > > The documentation[1] talks about preserving the stream order of
> > > results, but it doesn't discuss the order of the async requests. I tried
> > to
> > > use AsyncDataStream.orderedWait, but the order of async requests seems to
> > > be random – the order of calls gets shuffled even if I
> > > use AsyncDataStream.orderedWait.
> > >
> > > If that is by design, would there be any suggestion how to work around
> > > that? I was thinking of collecting all events of the same key into a
> > > List, so that the async operator gets a list instead of individual
> > events.
> > > There are of course some downsides with using a List, so I would rather
> > > have something better.
> > >
> > > In a nutshell my code is:
> > >
> > > AsyncDataStream.orderedWait(stream.keyBy(key), asyncFunction)
> > >
> > > The asyncFunction extends RichAsyncFunction.
> > >
> > > Thanks!
> > >
> > > [1]
> > >
> > >
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#order-of-results
> > >
> > > (Sorry if it's not appropriate to post this type of question to the dev
> > > mailing list. I tried the Flink users list with no luck.)
> > >

Reply via email to