Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the question
here, we discussed internally about releasing the underlying resources
after consuming the whole iterator. This probably covers quite a lot of use
cases. For some special cases that the user only consume part of the
iterator, Luke and Kenn's suggestion about releasing after processElement()
might work (I need to confirm about this with our use cases). So based on
what we discussed so far, we might have a good way to automatically close
an iterator for the store.

There is another issue though: right now the state API returns an iterable
for entries(), keys() and values(), and we can create iterator from it.
>From my understanding, the iterable holds a snapshot of the underlying
store. In case of rocksDb, it's going to be a db.snapshot(). Then when can
we release the snapshot? It's not like iterator where we can use some
heuristics to automatically release it. The user can hold on to the
iterable and create iterators throughout the whole processElement(). But if
we only close the iterable after processElement(), I am quite concerned
about the limitations this will bring. If the user is doing some remote
call during the process, then the snapshot might be held for a long time
before releasing, and might cause performance problems. And if the user
happen to create multiple iterables, then there will be multiple snapshots
loaded during process. Luke suggested being aggressive at closing the
resources and recreating when needed again. But in this case it might not
work since we won't be able to recreate the same snapshot given the store
might have been updated (and creating rocksDb snapshot is not cheap too). I
am running out of ideas other than exposing the iterator itself somehow
(and add close() if needed?). Any further suggestions?

@Kenn: btw, I have the same impl you posted earlier (CloseableIterator) in
an internal interface. I wrapped it in some sort of StateResource in the
fear that people might reject the proposal immediately after seeing the
close() on the iterator. I guess our users are familiar with rocksDb state,
it's pretty normal to close the iterator/snapshot after using it.

Thanks,
Xinyu

On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <lc...@google.com> wrote:

> The iterator going out of scope is the idiomatic way that resources are
> freed for Java developers (hence the weak/phantom reference suggestion).
> Explicitly requiring users to deal with 'handles' (like file streams) lead
> to leaked resources.
>
> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <k...@google.com> wrote:
>
>> Thanks Xinyu,
>>
>> I actually had first sketched out just what you wrote. But then I
>> realized a few things:
>>
>>  - usually an Iterable does not allocate resources, only its Iterators
>>  - if you consume the whole iterator, I hope the user would not have to
>> do any extra work
>>  - you can also automatically free it up at the end of the call to
>> @ProcessElement so that is easy too (but you might not want to)
>>  - so the main use is when the iterator terminates early and is not fully
>> consumed and you can't wait to finish the method
>>  - the scoping in Java will force a bunch of uninitialized declarations
>> outside the try-with-resources block, kind of a lot of boilerplate LOC
>>
>> One thing that is good about your proposal is that the iterable could
>> have some transparent caches that all free up together.
>>
>> Kenn
>>
>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>
>>> Thanks for drafting the details about the two approaches, Kenn. Now I
>>> understand Luke's proposal better. The approach looks neat, but the
>>> uncertainty of *when* GC is going to kick in will make users' life
>>> hard. If the user happens to configure a large JVM heap size, and since
>>> rocksDb uses off-heap memory, GC might start very late and less frequent
>>> than what we want. If we don't have a *definitive* way to let user
>>> close the underlying resources, then there is no good way to handle such
>>> failures of a critical application in production.
>>>
>>> Having a close method in the iterator might be a little unorthodox :).
>>> To some degree, this is actually a resource we are holding underneath, and
>>> I think it's pretty common to have close() for a resource in Java, e.g.
>>> BufferedReader and BufferedWriter. So I would imagine that we also define a
>>> resource for the state iterator and make the interface implements
>>> *AutoCloseable*. Here is my sketch:
>>>
>>> // StateResource MUST be closed after use.
>>> try (StateResource<Iterator<SomeType>>> st =
>>> bagState.iteratorResource()) {
>>>     Iterator<SomeType> iter = st.iterator();
>>>     while (iter.hasNext() {
>>>        .. do stuff ...
>>>     }
>>> } catch (Exception e) {
>>>     ... user code
>>> }
>>>
>>> The type/method name are just for illustrating here, so please don't
>>> laugh at them. Please feel free to comment and let me know if you have
>>> thoughts about the programming patterns here.
>>>
>>> Thanks,
>>> Xinyu
>>>
>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <k...@google.com> wrote:
>>>
>>>> It is too soon to argue whether an API is complex or not. There has
>>>> been no specific API proposed.
>>>>
>>>> I think the problem statement is real - you need to be able to read and
>>>> write bigger-than-memory state. It seems we have multiple runners that
>>>> don't support it, perhaps because of our API. You might be able to build
>>>> something good enough with phantom references, but you might not.
>>>>
>>>> If I understand the idea, it might look something like this:
>>>>
>>>>     new DoFn<>() {
>>>>        @StateId("foo")
>>>>        private final StateSpec<BagState<Whatever>> myBagSpec = ...
>>>>
>>>>        @ProcessElement
>>>>        public void proc(@StateId("foo") BagState<Whatever> myBag, ...) {
>>>>          CloseableIterator<Whatever> iterator = myBag.get().iterator();
>>>>          while(iterator.hasNext() && ... special condition ...) {
>>>>            ... do stuff ...
>>>>          }
>>>>          iterator.close();
>>>>        }
>>>>      }
>>>>
>>>> So it has no impact on users who don't choose to close() since they
>>>> iterate with for ( : ) as usual. And a runner that has the 10x funding to
>>>> try out a ReferenceQueue can be resilient to users that forget. On the
>>>> other hand, I haven't seen this pattern much in the wild, so I think it is
>>>> valuable to discuss other methods.
>>>>
>>>> While Luke's proposal is something like this if I understand his sketch
>>>> (replacing WeakReference with PhantomReference seems to be what you really
>>>> want):
>>>>
>>>>     ... in RocksDb state implementation ...
>>>>     class RocksDbBagState {
>>>>       static ReferenceQueue rocksDbIteratorQueue = new ReferenceQueue();
>>>>
>>>>       class Iterator {
>>>>          PhantomReference<RocksDbJniIterator> cIter;
>>>>          .next() {
>>>>            return cIter.next();
>>>>          }
>>>>       }
>>>>
>>>>      class Iterable {
>>>>         .iterator() {
>>>>           return new Iterator(new PhantomReference<>(rocksDbJniIterator,
>>>> rocksDbIteratorQueue));
>>>>         }
>>>>       }
>>>>     }
>>>>
>>>>     ... on another thread ...
>>>>     while(true) {
>>>>       RocksDbIterator deadRef = (RocksDbIterator)
>>>> rocksDbIteratorQueue.remove();
>>>>       deadRef.close();
>>>>     }
>>>>
>>>> When the iterator is GC'd, the phantom reference will pop onto the
>>>> queue for being closed. This might not be too bad. You'll have delayed
>>>> resource release, and potentially masked errors that are hard to debug. It
>>>> is less error-prone than WeakReference, which is asking for trouble when
>>>> objects are collected en masse. Anecdotally I have heard that performance
>>>> of this kind of approach is poor, but I haven't experienced it myself and I
>>>> can't find good data.
>>>>
>>>> Kenn
>>>>
>>>>
>>>>
>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>> wrote:
>>>>
>>>>> If I understand correctly, using weak references will help clean up
>>>>> the Java objects once GC kicks in. In case of kv-store likes rocksDb, the
>>>>> Java iterator is just a JNI interface to the underlying C iterator, so we
>>>>> need to explicitly invoke close to release the in-memory snapshot data,
>>>>> which can be large and accumulated quickly if it's not released when not 
>>>>> in
>>>>> use. Maybe I am missing something as you suggested here, but looks to me
>>>>> using weak references might not help in this case.
>>>>>
>>>>> I understand your concern, and I think you might misunderstood what I
>>>>> meant. I am totally for working hard for best user experience, and I think
>>>>> the current API provides a good example of that. That's also the reason I
>>>>> am implementing a runner here. I am just proposing an extra API to expose
>>>>> an iterator that can be closed when not needed, that way the users can use
>>>>> this feature to iterate through large state that doesn't fit into memory. 
>>>>> I
>>>>> believe this is also a pretty general use case and it's better to have
>>>>> support for it. I am actually arguing this will be a better user 
>>>>> experience
>>>>> to add this extra API since more users can benefit from it.
>>>>>
>>>>> Thanks,
>>>>> Xinyu
>>>>>
>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I don't agree. I believe you can track the iterators/iterables that
>>>>>> are created and freed by using weak references and reference queues (or
>>>>>> other methods). Having a few people work 10x as hard to provide a good
>>>>>> implementation is much better then having 100s or 1000s of users 
>>>>>> suffering
>>>>>> through a more complicated API.
>>>>>>
>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Load/evict blocks will help reduce the cache memory footprint, but
>>>>>>> we still won't be able to release the underlying resources. We can add
>>>>>>> definitely heuristics to help release the resources as you mentioned, 
>>>>>>> but
>>>>>>> there is no accurate way to track all the iterators/iterables created 
>>>>>>> and
>>>>>>> free them up once not needed. I think while the API is aimed at nice 
>>>>>>> user
>>>>>>> experience, we should have the option to let users optimize their
>>>>>>> performance if they choose to. Do you agree?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Xinyu
>>>>>>>
>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Users won't reliably close/release the resources and forcing them
>>>>>>>> to will make the user experience worse.
>>>>>>>> It will make a lot more sense to use a file format which allows
>>>>>>>> random access and use a cache to load/evict blocks of the state from 
>>>>>>>> memory.
>>>>>>>> If that is not possible, use an iterable which frees the resource
>>>>>>>> after a certain amount of inactivity or uses weak references.
>>>>>>>>
>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi, folks,
>>>>>>>>>
>>>>>>>>> I'm in the middle of implementing the MapState and SetState in our
>>>>>>>>> Samza runner. We noticed that the state returns the Java Iterable for
>>>>>>>>> reading entries, keys, etc. For state backed by file-based kv store 
>>>>>>>>> like
>>>>>>>>> rocksDb, we need to be able to let users explicitly close 
>>>>>>>>> iterator/iterable
>>>>>>>>> to release the resources.Otherwise we have to load the iterable into 
>>>>>>>>> memory
>>>>>>>>> so we can safely close the underlying rocksDb iterator, similar to 
>>>>>>>>> Flink's
>>>>>>>>> implementation. But this won't work for states that don't fit
>>>>>>>>> into memory. I chatted with Kenn and he also agrees we need this 
>>>>>>>>> capability
>>>>>>>>> to avoid bulk read/write. This seems to be a general use case and
>>>>>>>>> I'm wondering if we can add the support to it? I am happy to 
>>>>>>>>> contribute to
>>>>>>>>> this if needed. Any feedback is highly appreciated.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Xinyu
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>

Reply via email to