Alternatively to using weak/phantom reference: * Can you configure RocksDb's memory usage/limits? * Inside the iterator, periodically close and re-open the RocksDb connection seeking back to where the user was? * Use the ParDo/DoFn lifecycle and clean up after each processElement/finishBundle call.
On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <xinyuliu...@gmail.com> wrote: > Thanks for drafting the details about the two approaches, Kenn. Now I > understand Luke's proposal better. The approach looks neat, but the > uncertainty of *when* GC is going to kick in will make users' life hard. > If the user happens to configure a large JVM heap size, and since rocksDb > uses off-heap memory, GC might start very late and less frequent than what > we want. If we don't have a *definitive* way to let user close the > underlying resources, then there is no good way to handle such failures of > a critical application in production. > > Having a close method in the iterator might be a little unorthodox :). To > some degree, this is actually a resource we are holding underneath, and I > think it's pretty common to have close() for a resource in Java, e.g. > BufferedReader and BufferedWriter. So I would imagine that we also define a > resource for the state iterator and make the interface implements > *AutoCloseable*. Here is my sketch: > > // StateResource MUST be closed after use. > try (StateResource<Iterator<SomeType>>> st = bagState.iteratorResource()) { > Iterator<SomeType> iter = st.iterator(); > while (iter.hasNext() { > .. do stuff ... > } > } catch (Exception e) { > ... user code > } > > The type/method name are just for illustrating here, so please don't laugh > at them. Please feel free to comment and let me know if you have thoughts > about the programming patterns here. > > Thanks, > Xinyu > > On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <k...@google.com> wrote: > >> It is too soon to argue whether an API is complex or not. There has been >> no specific API proposed. >> >> I think the problem statement is real - you need to be able to read and >> write bigger-than-memory state. It seems we have multiple runners that >> don't support it, perhaps because of our API. You might be able to build >> something good enough with phantom references, but you might not. >> >> If I understand the idea, it might look something like this: >> >> new DoFn<>() { >> @StateId("foo") >> private final StateSpec<BagState<Whatever>> myBagSpec = ... >> >> @ProcessElement >> public void proc(@StateId("foo") BagState<Whatever> myBag, ...) { >> CloseableIterator<Whatever> iterator = myBag.get().iterator(); >> while(iterator.hasNext() && ... special condition ...) { >> ... do stuff ... >> } >> iterator.close(); >> } >> } >> >> So it has no impact on users who don't choose to close() since they >> iterate with for ( : ) as usual. And a runner that has the 10x funding to >> try out a ReferenceQueue can be resilient to users that forget. On the >> other hand, I haven't seen this pattern much in the wild, so I think it is >> valuable to discuss other methods. >> >> While Luke's proposal is something like this if I understand his sketch >> (replacing WeakReference with PhantomReference seems to be what you really >> want): >> >> ... in RocksDb state implementation ... >> class RocksDbBagState { >> static ReferenceQueue rocksDbIteratorQueue = new ReferenceQueue(); >> >> class Iterator { >> PhantomReference<RocksDbJniIterator> cIter; >> .next() { >> return cIter.next(); >> } >> } >> >> class Iterable { >> .iterator() { >> return new Iterator(new PhantomReference<>(rocksDbJniIterator, >> rocksDbIteratorQueue)); >> } >> } >> } >> >> ... on another thread ... >> while(true) { >> RocksDbIterator deadRef = (RocksDbIterator) >> rocksDbIteratorQueue.remove(); >> deadRef.close(); >> } >> >> When the iterator is GC'd, the phantom reference will pop onto the queue >> for being closed. This might not be too bad. You'll have delayed resource >> release, and potentially masked errors that are hard to debug. It is less >> error-prone than WeakReference, which is asking for trouble when objects >> are collected en masse. Anecdotally I have heard that performance of this >> kind of approach is poor, but I haven't experienced it myself and I can't >> find good data. >> >> Kenn >> >> >> >> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <xinyuliu...@gmail.com> wrote: >> >>> If I understand correctly, using weak references will help clean up the >>> Java objects once GC kicks in. In case of kv-store likes rocksDb, the Java >>> iterator is just a JNI interface to the underlying C iterator, so we need >>> to explicitly invoke close to release the in-memory snapshot data, which >>> can be large and accumulated quickly if it's not released when not in use. >>> Maybe I am missing something as you suggested here, but looks to me using >>> weak references might not help in this case. >>> >>> I understand your concern, and I think you might misunderstood what I >>> meant. I am totally for working hard for best user experience, and I think >>> the current API provides a good example of that. That's also the reason I >>> am implementing a runner here. I am just proposing an extra API to expose >>> an iterator that can be closed when not needed, that way the users can use >>> this feature to iterate through large state that doesn't fit into memory. I >>> believe this is also a pretty general use case and it's better to have >>> support for it. I am actually arguing this will be a better user experience >>> to add this extra API since more users can benefit from it. >>> >>> Thanks, >>> Xinyu >>> >>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <lc...@google.com> wrote: >>> >>>> I don't agree. I believe you can track the iterators/iterables that are >>>> created and freed by using weak references and reference queues (or other >>>> methods). Having a few people work 10x as hard to provide a good >>>> implementation is much better then having 100s or 1000s of users suffering >>>> through a more complicated API. >>>> >>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <xinyuliu...@gmail.com> >>>> wrote: >>>> >>>>> Load/evict blocks will help reduce the cache memory footprint, but we >>>>> still won't be able to release the underlying resources. We can add >>>>> definitely heuristics to help release the resources as you mentioned, but >>>>> there is no accurate way to track all the iterators/iterables created and >>>>> free them up once not needed. I think while the API is aimed at nice user >>>>> experience, we should have the option to let users optimize their >>>>> performance if they choose to. Do you agree? >>>>> >>>>> Thanks, >>>>> Xinyu >>>>> >>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> Users won't reliably close/release the resources and forcing them to >>>>>> will make the user experience worse. >>>>>> It will make a lot more sense to use a file format which allows >>>>>> random access and use a cache to load/evict blocks of the state from >>>>>> memory. >>>>>> If that is not possible, use an iterable which frees the resource >>>>>> after a certain amount of inactivity or uses weak references. >>>>>> >>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, folks, >>>>>>> >>>>>>> I'm in the middle of implementing the MapState and SetState in our >>>>>>> Samza runner. We noticed that the state returns the Java Iterable for >>>>>>> reading entries, keys, etc. For state backed by file-based kv store like >>>>>>> rocksDb, we need to be able to let users explicitly close >>>>>>> iterator/iterable >>>>>>> to release the resources.Otherwise we have to load the iterable into >>>>>>> memory >>>>>>> so we can safely close the underlying rocksDb iterator, similar to >>>>>>> Flink's >>>>>>> implementation. But this won't work for states that don't fit into >>>>>>> memory. I chatted with Kenn and he also agrees we need this capability >>>>>>> to >>>>>>> avoid bulk read/write. This seems to be a general use case and I'm >>>>>>> wondering if we can add the support to it? I am happy to contribute to >>>>>>> this >>>>>>> if needed. Any feedback is highly appreciated. >>>>>>> >>>>>>> Thanks, >>>>>>> Xinyu >>>>>>> >>>>>> >>>>> >>> >