Hi David,

It seems to me that at least with the heap-based state backend, readRange
> is going to have to do a lot of unnecessary work to implement this
> isEmpty() operation, since it have will to consider the entire range from
> MIN_VALUE to MAX_VALUE. (Maybe we should add an explicit isEmpty method?
> I'm not convinced we need it, but it would be cheaper to implement. Or
> perhaps this join can be rewritten to not need this operation; I haven't
> thought enough about that alternative.)
>

I think this really boils down to how the returned iterable is going to be
implemented. Basically for checking whether state is empty, you need to do
something along the lines of:

Iterables.isEmpty(state.readRange(Long.MIN_VALUE, MAX_VALUE)); // basically
checking `hasNext() == false` or `isEmpty()` in case of `Collection`

Few notes:
1) It could be lazy (the underlying collection doesn't have to be
materialized - eg. in case of RocksDB);
2) For HeapStateBackend it depends on the underlying implementation. I'd
probably do something along the lines of sorted tree (eg. SortedMap /
NavigableMap), that allows effective range scans / range deletes. Then you
could simply do something like (from top of the head):

@Value
class TimestampedKey<K> {
  K key;
  long timestamap;
}

SortedMap<TimestampedKey<K>, V> internalState;

Iterable<TimestampedValue<V>> readRange(long min, long max) {
  return toIterable(internalState.subMap(new TimestampedKey(currentKey(),
min), new TimestampedKey(currentKey(), max)));
}

This should be fairly cheap. The important bit is that the returned
iterator is always non-null, but could be empty.

Does that answer your question?

D.

On Wed, Apr 13, 2022 at 12:21 PM David Anderson <da...@alpinegizmo.com>
wrote:

> Yun Tang and Jingsong,
>
> Some flavor of OrderedMapState is certainly feasible, and I do see some
> appeal in supporting Binary**State.
>
> However, I haven't seen a motivating use case for this generalization, and
> would rather keep this as simple as possible. By handling Longs we can
> already optimize a wide range of use cases.
>
> David
>
>
> On Tue, Apr 12, 2022 at 9:21 AM Yun Tang <myas...@live.com> wrote:
>
> >  Hi David,
> >
> > Could you share some explanations why SortedMapState cannot work in
> > details? I just cannot catch up what the statement below means:
> >
> > This was rejected as being overly difficult to implement in a way that
> > would cleanly leverage RocksDB’s iterators.
> >
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Aitozi <gjying1...@gmail.com>
> > Sent: Tuesday, April 12, 2022 15:00
> > To: dev@flink.apache.org <dev@flink.apache.org>
> > Subject: Re: [DISCUSS] FLIP-220: Temporal State
> >
> > Hi David
> >      I have look through the doc, I think it will be a good improvement
> to
> > this pattern usage, I'm interested in it. Do you have some POC work to
> > share for a closer look.
> > Besides, I have one question that can we support expose the namespace in
> > the different state type not limited to `TemporalState`. By this, user
> can
> > specify the namespace
> > and the TemporalState is one of the special case that it use timestamp as
> > the namespace. I think it will be more extendable.
> >     What do you think about this ?
> >
> > Best,
> > Aitozi.
> >
> > David Anderson <dander...@apache.org> 于2022年4月11日周一 20:54写道:
> >
> > > Greetings, Flink developers.
> > >
> > > I would like to open up a discussion of a proposal [1] to add a new
> kind
> > of
> > > state to Flink.
> > >
> > > The goal here is to optimize a fairly common pattern, which is using
> > >
> > > MapState<Long, List<Event>>
> > >
> > > to store lists of events associated with timestamps. This pattern is
> used
> > > internally in quite a few operators that implement sorting and joins,
> and
> > > it also shows up in user code, for example, when implementing custom
> > > windowing in a KeyedProcessFunction.
> > >
> > > Nico Kruber, Seth Wiesman, and I have implemented a POC that achieves a
> > > more than 2x improvement in throughput when performing these operations
> > on
> > > RocksDB by better leveraging the capabilities of the RocksDB state
> > backend.
> > >
> > > See FLIP-220 [1] for details.
> > >
> > > Best,
> > > David
> > >
> > > [1] https://cwiki.apache.org/confluence/x/Xo_FD
> > >
> >
>

Reply via email to