I don't have any further suggestions, but want to call out how this hits a lot of interesting points.
The point about snapshotting is great. We have BEAM-2975 [1] and BEAM-2980 [2] where we debated things a bit. I think the strongest case is for what you describe - it should be a snapshot. Perhaps they should both be closed as fixed... And you also bring up long blocking calls - we have also deliberately decided that long synchronous blocking calls in @ProcessElement can be embraced for simple programming and compensated with autoscaling smarts (e.g. expand the thread pool by noticing poor utilization). The alternative is a more future-istic API where the calls can be explicitly asynchronous. We've had some interesting dev@ list discussions about that, too. Is another possibility to perhaps have read() return a ReadableState<Iterator> instead? We could, of course, have two methods with different names, one for iterator one for snapshot iterable. But wouldn't the Iterator also require a snapshot? Doesn't a native RocksDb iterator require a snapshot to have well-defined contents? As you can tell, I don't know enough about RocksDb details to be sure of my suggestions. Kenn [1] https://issues.apache.org/jira/browse/BEAM-2980 [2] https://issues.apache.org/jira/browse/BEAM-2975 On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <xinyuliu...@gmail.com> wrote: > Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the question > here, we discussed internally about releasing the underlying resources > after consuming the whole iterator. This probably covers quite a lot of use > cases. For some special cases that the user only consume part of the > iterator, Luke and Kenn's suggestion about releasing after processElement() > might work (I need to confirm about this with our use cases). So based on > what we discussed so far, we might have a good way to automatically close > an iterator for the store. > > There is another issue though: right now the state API returns an iterable > for entries(), keys() and values(), and we can create iterator from it. > From my understanding, the iterable holds a snapshot of the underlying > store. In case of rocksDb, it's going to be a db.snapshot(). Then when can > we release the snapshot? It's not like iterator where we can use some > heuristics to automatically release it. The user can hold on to the > iterable and create iterators throughout the whole processElement(). But if > we only close the iterable after processElement(), I am quite concerned > about the limitations this will bring. If the user is doing some remote > call during the process, then the snapshot might be held for a long time > before releasing, and might cause performance problems. And if the user > happen to create multiple iterables, then there will be multiple snapshots > loaded during process. Luke suggested being aggressive at closing the > resources and recreating when needed again. But in this case it might not > work since we won't be able to recreate the same snapshot given the store > might have been updated (and creating rocksDb snapshot is not cheap too). I > am running out of ideas other than exposing the iterator itself somehow > (and add close() if needed?). Any further suggestions? > > @Kenn: btw, I have the same impl you posted earlier (CloseableIterator) > in an internal interface. I wrapped it in some sort of StateResource in the > fear that people might reject the proposal immediately after seeing the > close() on the iterator. I guess our users are familiar with rocksDb state, > it's pretty normal to close the iterator/snapshot after using it. > > Thanks, > Xinyu > > On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <lc...@google.com> wrote: > >> The iterator going out of scope is the idiomatic way that resources are >> freed for Java developers (hence the weak/phantom reference suggestion). >> Explicitly requiring users to deal with 'handles' (like file streams) >> lead to leaked resources. >> >> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <k...@google.com> wrote: >> >>> Thanks Xinyu, >>> >>> I actually had first sketched out just what you wrote. But then I >>> realized a few things: >>> >>> - usually an Iterable does not allocate resources, only its Iterators >>> - if you consume the whole iterator, I hope the user would not have to >>> do any extra work >>> - you can also automatically free it up at the end of the call to >>> @ProcessElement so that is easy too (but you might not want to) >>> - so the main use is when the iterator terminates early and is not >>> fully consumed and you can't wait to finish the method >>> - the scoping in Java will force a bunch of uninitialized declarations >>> outside the try-with-resources block, kind of a lot of boilerplate LOC >>> >>> One thing that is good about your proposal is that the iterable could >>> have some transparent caches that all free up together. >>> >>> Kenn >>> >>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <xinyuliu...@gmail.com> wrote: >>> >>>> Thanks for drafting the details about the two approaches, Kenn. Now I >>>> understand Luke's proposal better. The approach looks neat, but the >>>> uncertainty of *when* GC is going to kick in will make users' life >>>> hard. If the user happens to configure a large JVM heap size, and since >>>> rocksDb uses off-heap memory, GC might start very late and less frequent >>>> than what we want. If we don't have a *definitive* way to let user >>>> close the underlying resources, then there is no good way to handle such >>>> failures of a critical application in production. >>>> >>>> Having a close method in the iterator might be a little unorthodox :). >>>> To some degree, this is actually a resource we are holding underneath, and >>>> I think it's pretty common to have close() for a resource in Java, e.g. >>>> BufferedReader and BufferedWriter. So I would imagine that we also define a >>>> resource for the state iterator and make the interface implements >>>> *AutoCloseable*. Here is my sketch: >>>> >>>> // StateResource MUST be closed after use. >>>> try (StateResource<Iterator<SomeType>>> st = >>>> bagState.iteratorResource()) { >>>> Iterator<SomeType> iter = st.iterator(); >>>> while (iter.hasNext() { >>>> .. do stuff ... >>>> } >>>> } catch (Exception e) { >>>> ... user code >>>> } >>>> >>>> The type/method name are just for illustrating here, so please don't >>>> laugh at them. Please feel free to comment and let me know if you have >>>> thoughts about the programming patterns here. >>>> >>>> Thanks, >>>> Xinyu >>>> >>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <k...@google.com> >>>> wrote: >>>> >>>>> It is too soon to argue whether an API is complex or not. There has >>>>> been no specific API proposed. >>>>> >>>>> I think the problem statement is real - you need to be able to read >>>>> and write bigger-than-memory state. It seems we have multiple runners that >>>>> don't support it, perhaps because of our API. You might be able to build >>>>> something good enough with phantom references, but you might not. >>>>> >>>>> If I understand the idea, it might look something like this: >>>>> >>>>> new DoFn<>() { >>>>> @StateId("foo") >>>>> private final StateSpec<BagState<Whatever>> myBagSpec = ... >>>>> >>>>> @ProcessElement >>>>> public void proc(@StateId("foo") BagState<Whatever> myBag, ...) >>>>> { >>>>> CloseableIterator<Whatever> iterator = myBag.get().iterator(); >>>>> while(iterator.hasNext() && ... special condition ...) { >>>>> ... do stuff ... >>>>> } >>>>> iterator.close(); >>>>> } >>>>> } >>>>> >>>>> So it has no impact on users who don't choose to close() since they >>>>> iterate with for ( : ) as usual. And a runner that has the 10x funding to >>>>> try out a ReferenceQueue can be resilient to users that forget. On the >>>>> other hand, I haven't seen this pattern much in the wild, so I think it is >>>>> valuable to discuss other methods. >>>>> >>>>> While Luke's proposal is something like this if I understand his >>>>> sketch (replacing WeakReference with PhantomReference seems to be what you >>>>> really want): >>>>> >>>>> ... in RocksDb state implementation ... >>>>> class RocksDbBagState { >>>>> static ReferenceQueue rocksDbIteratorQueue = new >>>>> ReferenceQueue(); >>>>> >>>>> class Iterator { >>>>> PhantomReference<RocksDbJniIterator> cIter; >>>>> .next() { >>>>> return cIter.next(); >>>>> } >>>>> } >>>>> >>>>> class Iterable { >>>>> .iterator() { >>>>> return new Iterator(new >>>>> PhantomReference<>(rocksDbJniIterator, rocksDbIteratorQueue)); >>>>> } >>>>> } >>>>> } >>>>> >>>>> ... on another thread ... >>>>> while(true) { >>>>> RocksDbIterator deadRef = (RocksDbIterator) >>>>> rocksDbIteratorQueue.remove(); >>>>> deadRef.close(); >>>>> } >>>>> >>>>> When the iterator is GC'd, the phantom reference will pop onto the >>>>> queue for being closed. This might not be too bad. You'll have delayed >>>>> resource release, and potentially masked errors that are hard to debug. It >>>>> is less error-prone than WeakReference, which is asking for trouble when >>>>> objects are collected en masse. Anecdotally I have heard that performance >>>>> of this kind of approach is poor, but I haven't experienced it myself and >>>>> I >>>>> can't find good data. >>>>> >>>>> Kenn >>>>> >>>>> >>>>> >>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>> wrote: >>>>> >>>>>> If I understand correctly, using weak references will help clean up >>>>>> the Java objects once GC kicks in. In case of kv-store likes rocksDb, the >>>>>> Java iterator is just a JNI interface to the underlying C iterator, so we >>>>>> need to explicitly invoke close to release the in-memory snapshot data, >>>>>> which can be large and accumulated quickly if it's not released when not >>>>>> in >>>>>> use. Maybe I am missing something as you suggested here, but looks to me >>>>>> using weak references might not help in this case. >>>>>> >>>>>> I understand your concern, and I think you might misunderstood what I >>>>>> meant. I am totally for working hard for best user experience, and I >>>>>> think >>>>>> the current API provides a good example of that. That's also the reason I >>>>>> am implementing a runner here. I am just proposing an extra API to expose >>>>>> an iterator that can be closed when not needed, that way the users can >>>>>> use >>>>>> this feature to iterate through large state that doesn't fit into >>>>>> memory. I >>>>>> believe this is also a pretty general use case and it's better to have >>>>>> support for it. I am actually arguing this will be a better user >>>>>> experience >>>>>> to add this extra API since more users can benefit from it. >>>>>> >>>>>> Thanks, >>>>>> Xinyu >>>>>> >>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <lc...@google.com> >>>>>> wrote: >>>>>> >>>>>>> I don't agree. I believe you can track the iterators/iterables that >>>>>>> are created and freed by using weak references and reference queues (or >>>>>>> other methods). Having a few people work 10x as hard to provide a good >>>>>>> implementation is much better then having 100s or 1000s of users >>>>>>> suffering >>>>>>> through a more complicated API. >>>>>>> >>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Load/evict blocks will help reduce the cache memory footprint, but >>>>>>>> we still won't be able to release the underlying resources. We can add >>>>>>>> definitely heuristics to help release the resources as you mentioned, >>>>>>>> but >>>>>>>> there is no accurate way to track all the iterators/iterables created >>>>>>>> and >>>>>>>> free them up once not needed. I think while the API is aimed at nice >>>>>>>> user >>>>>>>> experience, we should have the option to let users optimize their >>>>>>>> performance if they choose to. Do you agree? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Xinyu >>>>>>>> >>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <lc...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Users won't reliably close/release the resources and forcing them >>>>>>>>> to will make the user experience worse. >>>>>>>>> It will make a lot more sense to use a file format which allows >>>>>>>>> random access and use a cache to load/evict blocks of the state from >>>>>>>>> memory. >>>>>>>>> If that is not possible, use an iterable which frees the resource >>>>>>>>> after a certain amount of inactivity or uses weak references. >>>>>>>>> >>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, folks, >>>>>>>>>> >>>>>>>>>> I'm in the middle of implementing the MapState and SetState in >>>>>>>>>> our Samza runner. We noticed that the state returns the Java >>>>>>>>>> Iterable for >>>>>>>>>> reading entries, keys, etc. For state backed by file-based kv store >>>>>>>>>> like >>>>>>>>>> rocksDb, we need to be able to let users explicitly close >>>>>>>>>> iterator/iterable >>>>>>>>>> to release the resources.Otherwise we have to load the iterable into >>>>>>>>>> memory >>>>>>>>>> so we can safely close the underlying rocksDb iterator, similar to >>>>>>>>>> Flink's >>>>>>>>>> implementation. But this won't work for states that don't fit >>>>>>>>>> into memory. I chatted with Kenn and he also agrees we need this >>>>>>>>>> capability >>>>>>>>>> to avoid bulk read/write. This seems to be a general use case >>>>>>>>>> and I'm wondering if we can add the support to it? I am happy to >>>>>>>>>> contribute >>>>>>>>>> to this if needed. Any feedback is highly appreciated. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Xinyu >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>> >