I don't have any further suggestions, but want to call out how this hits a
lot of interesting points.

The point about snapshotting is great. We have BEAM-2975 [1] and BEAM-2980
[2] where we debated things a bit. I think the strongest case is for what
you describe - it should be a snapshot. Perhaps they should both be closed
as fixed...

And you also bring up long blocking calls - we have also deliberately
decided that long synchronous blocking calls in @ProcessElement can be
embraced for simple programming and compensated with autoscaling smarts
(e.g. expand the thread pool by noticing poor utilization). The alternative
is a more future-istic API where the calls can be explicitly asynchronous.
We've had some interesting dev@ list discussions about that, too.

Is another possibility to perhaps have read() return a
ReadableState<Iterator> instead? We could, of course, have two methods with
different names, one for iterator one for snapshot iterable. But wouldn't
the Iterator also require a snapshot? Doesn't a native RocksDb iterator
require a snapshot to have well-defined contents? As you can tell, I don't
know enough about RocksDb details to be sure of my suggestions.

Kenn

[1] https://issues.apache.org/jira/browse/BEAM-2980
[2] https://issues.apache.org/jira/browse/BEAM-2975

On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:

> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the question
> here, we discussed internally about releasing the underlying resources
> after consuming the whole iterator. This probably covers quite a lot of use
> cases. For some special cases that the user only consume part of the
> iterator, Luke and Kenn's suggestion about releasing after processElement()
> might work (I need to confirm about this with our use cases). So based on
> what we discussed so far, we might have a good way to automatically close
> an iterator for the store.
>
> There is another issue though: right now the state API returns an iterable
> for entries(), keys() and values(), and we can create iterator from it.
> From my understanding, the iterable holds a snapshot of the underlying
> store. In case of rocksDb, it's going to be a db.snapshot(). Then when can
> we release the snapshot? It's not like iterator where we can use some
> heuristics to automatically release it. The user can hold on to the
> iterable and create iterators throughout the whole processElement(). But if
> we only close the iterable after processElement(), I am quite concerned
> about the limitations this will bring. If the user is doing some remote
> call during the process, then the snapshot might be held for a long time
> before releasing, and might cause performance problems. And if the user
> happen to create multiple iterables, then there will be multiple snapshots
> loaded during process. Luke suggested being aggressive at closing the
> resources and recreating when needed again. But in this case it might not
> work since we won't be able to recreate the same snapshot given the store
> might have been updated (and creating rocksDb snapshot is not cheap too). I
> am running out of ideas other than exposing the iterator itself somehow
> (and add close() if needed?). Any further suggestions?
>
> @Kenn: btw, I have the same impl you posted earlier (CloseableIterator)
> in an internal interface. I wrapped it in some sort of StateResource in the
> fear that people might reject the proposal immediately after seeing the
> close() on the iterator. I guess our users are familiar with rocksDb state,
> it's pretty normal to close the iterator/snapshot after using it.
>
> Thanks,
> Xinyu
>
> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <lc...@google.com> wrote:
>
>> The iterator going out of scope is the idiomatic way that resources are
>> freed for Java developers (hence the weak/phantom reference suggestion).
>> Explicitly requiring users to deal with 'handles' (like file streams)
>> lead to leaked resources.
>>
>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <k...@google.com> wrote:
>>
>>> Thanks Xinyu,
>>>
>>> I actually had first sketched out just what you wrote. But then I
>>> realized a few things:
>>>
>>>  - usually an Iterable does not allocate resources, only its Iterators
>>>  - if you consume the whole iterator, I hope the user would not have to
>>> do any extra work
>>>  - you can also automatically free it up at the end of the call to
>>> @ProcessElement so that is easy too (but you might not want to)
>>>  - so the main use is when the iterator terminates early and is not
>>> fully consumed and you can't wait to finish the method
>>>  - the scoping in Java will force a bunch of uninitialized declarations
>>> outside the try-with-resources block, kind of a lot of boilerplate LOC
>>>
>>> One thing that is good about your proposal is that the iterable could
>>> have some transparent caches that all free up together.
>>>
>>> Kenn
>>>
>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>>
>>>> Thanks for drafting the details about the two approaches, Kenn. Now I
>>>> understand Luke's proposal better. The approach looks neat, but the
>>>> uncertainty of *when* GC is going to kick in will make users' life
>>>> hard. If the user happens to configure a large JVM heap size, and since
>>>> rocksDb uses off-heap memory, GC might start very late and less frequent
>>>> than what we want. If we don't have a *definitive* way to let user
>>>> close the underlying resources, then there is no good way to handle such
>>>> failures of a critical application in production.
>>>>
>>>> Having a close method in the iterator might be a little unorthodox :).
>>>> To some degree, this is actually a resource we are holding underneath, and
>>>> I think it's pretty common to have close() for a resource in Java, e.g.
>>>> BufferedReader and BufferedWriter. So I would imagine that we also define a
>>>> resource for the state iterator and make the interface implements
>>>> *AutoCloseable*. Here is my sketch:
>>>>
>>>> // StateResource MUST be closed after use.
>>>> try (StateResource<Iterator<SomeType>>> st =
>>>> bagState.iteratorResource()) {
>>>>     Iterator<SomeType> iter = st.iterator();
>>>>     while (iter.hasNext() {
>>>>        .. do stuff ...
>>>>     }
>>>> } catch (Exception e) {
>>>>     ... user code
>>>> }
>>>>
>>>> The type/method name are just for illustrating here, so please don't
>>>> laugh at them. Please feel free to comment and let me know if you have
>>>> thoughts about the programming patterns here.
>>>>
>>>> Thanks,
>>>> Xinyu
>>>>
>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <k...@google.com>
>>>> wrote:
>>>>
>>>>> It is too soon to argue whether an API is complex or not. There has
>>>>> been no specific API proposed.
>>>>>
>>>>> I think the problem statement is real - you need to be able to read
>>>>> and write bigger-than-memory state. It seems we have multiple runners that
>>>>> don't support it, perhaps because of our API. You might be able to build
>>>>> something good enough with phantom references, but you might not.
>>>>>
>>>>> If I understand the idea, it might look something like this:
>>>>>
>>>>>     new DoFn<>() {
>>>>>        @StateId("foo")
>>>>>        private final StateSpec<BagState<Whatever>> myBagSpec = ...
>>>>>
>>>>>        @ProcessElement
>>>>>        public void proc(@StateId("foo") BagState<Whatever> myBag, ...)
>>>>> {
>>>>>          CloseableIterator<Whatever> iterator = myBag.get().iterator();
>>>>>          while(iterator.hasNext() && ... special condition ...) {
>>>>>            ... do stuff ...
>>>>>          }
>>>>>          iterator.close();
>>>>>        }
>>>>>      }
>>>>>
>>>>> So it has no impact on users who don't choose to close() since they
>>>>> iterate with for ( : ) as usual. And a runner that has the 10x funding to
>>>>> try out a ReferenceQueue can be resilient to users that forget. On the
>>>>> other hand, I haven't seen this pattern much in the wild, so I think it is
>>>>> valuable to discuss other methods.
>>>>>
>>>>> While Luke's proposal is something like this if I understand his
>>>>> sketch (replacing WeakReference with PhantomReference seems to be what you
>>>>> really want):
>>>>>
>>>>>     ... in RocksDb state implementation ...
>>>>>     class RocksDbBagState {
>>>>>       static ReferenceQueue rocksDbIteratorQueue = new
>>>>> ReferenceQueue();
>>>>>
>>>>>       class Iterator {
>>>>>          PhantomReference<RocksDbJniIterator> cIter;
>>>>>          .next() {
>>>>>            return cIter.next();
>>>>>          }
>>>>>       }
>>>>>
>>>>>      class Iterable {
>>>>>         .iterator() {
>>>>>           return new Iterator(new
>>>>> PhantomReference<>(rocksDbJniIterator, rocksDbIteratorQueue));
>>>>>         }
>>>>>       }
>>>>>     }
>>>>>
>>>>>     ... on another thread ...
>>>>>     while(true) {
>>>>>       RocksDbIterator deadRef = (RocksDbIterator)
>>>>> rocksDbIteratorQueue.remove();
>>>>>       deadRef.close();
>>>>>     }
>>>>>
>>>>> When the iterator is GC'd, the phantom reference will pop onto the
>>>>> queue for being closed. This might not be too bad. You'll have delayed
>>>>> resource release, and potentially masked errors that are hard to debug. It
>>>>> is less error-prone than WeakReference, which is asking for trouble when
>>>>> objects are collected en masse. Anecdotally I have heard that performance
>>>>> of this kind of approach is poor, but I haven't experienced it myself and 
>>>>> I
>>>>> can't find good data.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> If I understand correctly, using weak references will help clean up
>>>>>> the Java objects once GC kicks in. In case of kv-store likes rocksDb, the
>>>>>> Java iterator is just a JNI interface to the underlying C iterator, so we
>>>>>> need to explicitly invoke close to release the in-memory snapshot data,
>>>>>> which can be large and accumulated quickly if it's not released when not 
>>>>>> in
>>>>>> use. Maybe I am missing something as you suggested here, but looks to me
>>>>>> using weak references might not help in this case.
>>>>>>
>>>>>> I understand your concern, and I think you might misunderstood what I
>>>>>> meant. I am totally for working hard for best user experience, and I 
>>>>>> think
>>>>>> the current API provides a good example of that. That's also the reason I
>>>>>> am implementing a runner here. I am just proposing an extra API to expose
>>>>>> an iterator that can be closed when not needed, that way the users can 
>>>>>> use
>>>>>> this feature to iterate through large state that doesn't fit into 
>>>>>> memory. I
>>>>>> believe this is also a pretty general use case and it's better to have
>>>>>> support for it. I am actually arguing this will be a better user 
>>>>>> experience
>>>>>> to add this extra API since more users can benefit from it.
>>>>>>
>>>>>> Thanks,
>>>>>> Xinyu
>>>>>>
>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I don't agree. I believe you can track the iterators/iterables that
>>>>>>> are created and freed by using weak references and reference queues (or
>>>>>>> other methods). Having a few people work 10x as hard to provide a good
>>>>>>> implementation is much better then having 100s or 1000s of users 
>>>>>>> suffering
>>>>>>> through a more complicated API.
>>>>>>>
>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Load/evict blocks will help reduce the cache memory footprint, but
>>>>>>>> we still won't be able to release the underlying resources. We can add
>>>>>>>> definitely heuristics to help release the resources as you mentioned, 
>>>>>>>> but
>>>>>>>> there is no accurate way to track all the iterators/iterables created 
>>>>>>>> and
>>>>>>>> free them up once not needed. I think while the API is aimed at nice 
>>>>>>>> user
>>>>>>>> experience, we should have the option to let users optimize their
>>>>>>>> performance if they choose to. Do you agree?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Xinyu
>>>>>>>>
>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Users won't reliably close/release the resources and forcing them
>>>>>>>>> to will make the user experience worse.
>>>>>>>>> It will make a lot more sense to use a file format which allows
>>>>>>>>> random access and use a cache to load/evict blocks of the state from 
>>>>>>>>> memory.
>>>>>>>>> If that is not possible, use an iterable which frees the resource
>>>>>>>>> after a certain amount of inactivity or uses weak references.
>>>>>>>>>
>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi, folks,
>>>>>>>>>>
>>>>>>>>>> I'm in the middle of implementing the MapState and SetState in
>>>>>>>>>> our Samza runner. We noticed that the state returns the Java 
>>>>>>>>>> Iterable for
>>>>>>>>>> reading entries, keys, etc. For state backed by file-based kv store 
>>>>>>>>>> like
>>>>>>>>>> rocksDb, we need to be able to let users explicitly close 
>>>>>>>>>> iterator/iterable
>>>>>>>>>> to release the resources.Otherwise we have to load the iterable into 
>>>>>>>>>> memory
>>>>>>>>>> so we can safely close the underlying rocksDb iterator, similar to 
>>>>>>>>>> Flink's
>>>>>>>>>> implementation. But this won't work for states that don't fit
>>>>>>>>>> into memory. I chatted with Kenn and he also agrees we need this 
>>>>>>>>>> capability
>>>>>>>>>> to avoid bulk read/write. This seems to be a general use case
>>>>>>>>>> and I'm wondering if we can add the support to it? I am happy to 
>>>>>>>>>> contribute
>>>>>>>>>> to this if needed. Any feedback is highly appreciated.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Xinyu
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>

Reply via email to