The iterator going out of scope is the idiomatic way that resources are freed for Java developers (hence the weak/phantom reference suggestion). Explicitly requiring users to deal with 'handles' (like file streams) lead to leaked resources.
On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <k...@google.com> wrote: > Thanks Xinyu, > > I actually had first sketched out just what you wrote. But then I realized > a few things: > > - usually an Iterable does not allocate resources, only its Iterators > - if you consume the whole iterator, I hope the user would not have to do > any extra work > - you can also automatically free it up at the end of the call to > @ProcessElement so that is easy too (but you might not want to) > - so the main use is when the iterator terminates early and is not fully > consumed and you can't wait to finish the method > - the scoping in Java will force a bunch of uninitialized declarations > outside the try-with-resources block, kind of a lot of boilerplate LOC > > One thing that is good about your proposal is that the iterable could have > some transparent caches that all free up together. > > Kenn > > On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <xinyuliu...@gmail.com> wrote: > >> Thanks for drafting the details about the two approaches, Kenn. Now I >> understand Luke's proposal better. The approach looks neat, but the >> uncertainty of *when* GC is going to kick in will make users' life hard. >> If the user happens to configure a large JVM heap size, and since rocksDb >> uses off-heap memory, GC might start very late and less frequent than what >> we want. If we don't have a *definitive* way to let user close the >> underlying resources, then there is no good way to handle such failures of >> a critical application in production. >> >> Having a close method in the iterator might be a little unorthodox :). To >> some degree, this is actually a resource we are holding underneath, and I >> think it's pretty common to have close() for a resource in Java, e.g. >> BufferedReader and BufferedWriter. So I would imagine that we also define a >> resource for the state iterator and make the interface implements >> *AutoCloseable*. Here is my sketch: >> >> // StateResource MUST be closed after use. >> try (StateResource<Iterator<SomeType>>> st = bagState.iteratorResource()) >> { >> Iterator<SomeType> iter = st.iterator(); >> while (iter.hasNext() { >> .. do stuff ... >> } >> } catch (Exception e) { >> ... user code >> } >> >> The type/method name are just for illustrating here, so please don't >> laugh at them. Please feel free to comment and let me know if you have >> thoughts about the programming patterns here. >> >> Thanks, >> Xinyu >> >> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <k...@google.com> wrote: >> >>> It is too soon to argue whether an API is complex or not. There has been >>> no specific API proposed. >>> >>> I think the problem statement is real - you need to be able to read and >>> write bigger-than-memory state. It seems we have multiple runners that >>> don't support it, perhaps because of our API. You might be able to build >>> something good enough with phantom references, but you might not. >>> >>> If I understand the idea, it might look something like this: >>> >>> new DoFn<>() { >>> @StateId("foo") >>> private final StateSpec<BagState<Whatever>> myBagSpec = ... >>> >>> @ProcessElement >>> public void proc(@StateId("foo") BagState<Whatever> myBag, ...) { >>> CloseableIterator<Whatever> iterator = myBag.get().iterator(); >>> while(iterator.hasNext() && ... special condition ...) { >>> ... do stuff ... >>> } >>> iterator.close(); >>> } >>> } >>> >>> So it has no impact on users who don't choose to close() since they >>> iterate with for ( : ) as usual. And a runner that has the 10x funding to >>> try out a ReferenceQueue can be resilient to users that forget. On the >>> other hand, I haven't seen this pattern much in the wild, so I think it is >>> valuable to discuss other methods. >>> >>> While Luke's proposal is something like this if I understand his sketch >>> (replacing WeakReference with PhantomReference seems to be what you really >>> want): >>> >>> ... in RocksDb state implementation ... >>> class RocksDbBagState { >>> static ReferenceQueue rocksDbIteratorQueue = new ReferenceQueue(); >>> >>> class Iterator { >>> PhantomReference<RocksDbJniIterator> cIter; >>> .next() { >>> return cIter.next(); >>> } >>> } >>> >>> class Iterable { >>> .iterator() { >>> return new Iterator(new PhantomReference<>(rocksDbJniIterator, >>> rocksDbIteratorQueue)); >>> } >>> } >>> } >>> >>> ... on another thread ... >>> while(true) { >>> RocksDbIterator deadRef = (RocksDbIterator) >>> rocksDbIteratorQueue.remove(); >>> deadRef.close(); >>> } >>> >>> When the iterator is GC'd, the phantom reference will pop onto the queue >>> for being closed. This might not be too bad. You'll have delayed resource >>> release, and potentially masked errors that are hard to debug. It is less >>> error-prone than WeakReference, which is asking for trouble when objects >>> are collected en masse. Anecdotally I have heard that performance of this >>> kind of approach is poor, but I haven't experienced it myself and I can't >>> find good data. >>> >>> Kenn >>> >>> >>> >>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <xinyuliu...@gmail.com> wrote: >>> >>>> If I understand correctly, using weak references will help clean up the >>>> Java objects once GC kicks in. In case of kv-store likes rocksDb, the Java >>>> iterator is just a JNI interface to the underlying C iterator, so we need >>>> to explicitly invoke close to release the in-memory snapshot data, which >>>> can be large and accumulated quickly if it's not released when not in use. >>>> Maybe I am missing something as you suggested here, but looks to me using >>>> weak references might not help in this case. >>>> >>>> I understand your concern, and I think you might misunderstood what I >>>> meant. I am totally for working hard for best user experience, and I think >>>> the current API provides a good example of that. That's also the reason I >>>> am implementing a runner here. I am just proposing an extra API to expose >>>> an iterator that can be closed when not needed, that way the users can use >>>> this feature to iterate through large state that doesn't fit into memory. I >>>> believe this is also a pretty general use case and it's better to have >>>> support for it. I am actually arguing this will be a better user experience >>>> to add this extra API since more users can benefit from it. >>>> >>>> Thanks, >>>> Xinyu >>>> >>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> I don't agree. I believe you can track the iterators/iterables that >>>>> are created and freed by using weak references and reference queues (or >>>>> other methods). Having a few people work 10x as hard to provide a good >>>>> implementation is much better then having 100s or 1000s of users suffering >>>>> through a more complicated API. >>>>> >>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>> wrote: >>>>> >>>>>> Load/evict blocks will help reduce the cache memory footprint, but we >>>>>> still won't be able to release the underlying resources. We can add >>>>>> definitely heuristics to help release the resources as you mentioned, but >>>>>> there is no accurate way to track all the iterators/iterables created and >>>>>> free them up once not needed. I think while the API is aimed at nice user >>>>>> experience, we should have the option to let users optimize their >>>>>> performance if they choose to. Do you agree? >>>>>> >>>>>> Thanks, >>>>>> Xinyu >>>>>> >>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <lc...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Users won't reliably close/release the resources and forcing them to >>>>>>> will make the user experience worse. >>>>>>> It will make a lot more sense to use a file format which allows >>>>>>> random access and use a cache to load/evict blocks of the state from >>>>>>> memory. >>>>>>> If that is not possible, use an iterable which frees the resource >>>>>>> after a certain amount of inactivity or uses weak references. >>>>>>> >>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, folks, >>>>>>>> >>>>>>>> I'm in the middle of implementing the MapState and SetState in our >>>>>>>> Samza runner. We noticed that the state returns the Java Iterable for >>>>>>>> reading entries, keys, etc. For state backed by file-based kv store >>>>>>>> like >>>>>>>> rocksDb, we need to be able to let users explicitly close >>>>>>>> iterator/iterable >>>>>>>> to release the resources.Otherwise we have to load the iterable into >>>>>>>> memory >>>>>>>> so we can safely close the underlying rocksDb iterator, similar to >>>>>>>> Flink's >>>>>>>> implementation. But this won't work for states that don't fit into >>>>>>>> memory. I chatted with Kenn and he also agrees we need this capability >>>>>>>> to >>>>>>>> avoid bulk read/write. This seems to be a general use case and I'm >>>>>>>> wondering if we can add the support to it? I am happy to contribute to >>>>>>>> this >>>>>>>> if needed. Any feedback is highly appreciated. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Xinyu >>>>>>>> >>>>>>> >>>>>> >>>> >>