On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <lc...@google.com> wrote:
> Before you go on and update the user facing API, we should discuss the > last point I made since the change your making will have limited usability > since the portability effort won't realistically allow you to see such low > level things like when processElement finished and supporting user state > will be modeled using the following three operations: read (with > continuation tokens), append (blind write), clear. It may be moot to > discuss how to integrate Samza into the existing framework and should work > towards being a portability based runner. There are more details here about > supporting user state during pipeline execution: > https://s.apache.org/beam-fn-state-api-and-bundle-processing, top level > docs about portability are https://s.apache.org/beam-runner-api and > https://s.apache.org/beam-fn-api. > > Do we really want to make a change to the user facing API for user state > in the Java SDK when that code path will be phased out in exchange for > using the portability APIs? > To be clear, the user API will remain the same, but the runner's implementation path will be phased out. So should we expand this discussion to how the portability APIs enable the SDK and runner to collaborate to achieve this use case? It seems like the interaction you need is that the runner can tell that the SDK can closed the connection on the read(), and the SDK needs to do so promptly, right? Kenn > On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <xinyuliu...@gmail.com> wrote: > >> We discussed internally about the proposed approaches. Seems if the State >> API can also expose another method to return a ReadableState<Iterator>, >> it will cover our cases of iterating over a bigger-then-memory state, and >> closing the underlying rocksDb snapshot immediately after the iterator is >> fully consumed (most cases), and also safely close the rest of the >> iterators after proessElement (minor cases). If this sounds good to you >> guys, I am going to create a JIRA ticket for it and open a PR for it. >> Thanks a lot for the discussions here. >> >> The discussions about Async processing is very interesting, and I think >> it's definitely worth its own thread. I believe we do need the support from >> the Beam API/model so the users can take advantage of it (Besides Samza, >> Flink also has an async operator that helps a lot in the Alibaba's use >> cases). Yes, it will add complexity to the underlying framework, but it's >> going to be great for the users to do remote IO. In practice we found it >> actually avoids thread+locks issues, as Kenn mentioned above. I am not sure >> whether this feature can be runner-specific support thing. I will probably >> create another email thread for this discussion in the future and hopefully >> I can move this forward. >> >> Thanks, >> Xinyu >> >> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <k...@google.com> wrote: >> >>> >>> >>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> Users typically want to do that async operation and then produce output >>>> with it. Adding asynchronous execution is difficult within the framework >>>> because a lot of code is currently not needed to be thread safe and writing >>>> code to be fast and handle asynchronous execution is quite difficult. >>>> Adding async operations typically leads to code with too many >>>> locks/synchronization blocks. >>>> >>> >>> Just a small point here, as it is very unusual to use locks with >>> futures. The essential innovation of futures is that it is a new way to >>> program that avoids threads+locks style. In part, you get this for free >>> from functional programming style, and on the other hand you reify >>> asynchronous side effects as data dependencies. >>> >>> Kenn >>> >>> >>> >>>> Note that with the portability effort, the Runner won't have visibility >>>> into such low level things like when an object is garbage collected and >>>> supporting user state will be modeled using the following three operations: >>>> read (with continuation tokens), append (blind write), clear. It may be >>>> moot to discuss how to integrate Samza into the existing framework and >>>> should work towards being a portability based runner. There are more >>>> details here about supporting user state during pipeline execution: >>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top >>>> level docs about portability are https://s.apache.org/beam-runner-api and >>>> https://s.apache.org/beam-fn-api. >>>> >>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> At least one API that has been discussed in the past, is to use Java 8 >>>>> CompletionStage. e.g. >>>>> >>>>> new DoFn<InputT, OutputT>() { >>>>> @ProcessElement >>>>> public void process(@Element CompletionStage<InputT> element, ...) >>>>> { >>>>> element.thenApply(...) >>>>> } >>>>> } >>>>> >>>>> The framework will automatically create the CompletionStage, and the >>>>> process method can specify a pipeline of asynchronous operations to >>>>> perform >>>>> on the element. When all of them are done, the element will be marked as >>>>> successfully processed. >>>>> >>>>> Reuven >>>>> >>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <xinyuliu...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks for all the pointers. I looked though the discussion over >>>>>> BEAM-2975 >>>>>> and BEAM-2980 about having snapshot or live views of iterable, and >>>>>> the current semantics makes a lot of sense to me. For your question: it >>>>>> does not require an explicit snapshot when we create RocksDb iterator >>>>>> directly. The iterator will read from an implicit snapshot as of the time >>>>>> the iterator is created [1], and the snapshot will be released after the >>>>>> iterator is closed. If we can have another method to return >>>>>> ReadableState<Iterator>, we might be able to apply the auto-closing >>>>>> approaches as we discussed and solve the problem here :). >>>>>> >>>>>> It's very interesting that you bring up the discussion about async >>>>>> API! Async IO has been widely adopted here among our users: they use >>>>>> netty >>>>>> for async calls with library named ParSeq [2] to help manage the calls. >>>>>> Samza provides a primitive callback style API [3], in which the user will >>>>>> invoke the callback after the remote calls are complete. Currently in a >>>>>> Samza job our users use this API with the ParSeq lib for remote IO. Seems >>>>>> we might have to do blocking calls (thus the poor resource utilization >>>>>> you >>>>>> mentioned) when using Beam API for now. It'll be great if you can send a >>>>>> few more details about the discussion about async API. I would like to >>>>>> add >>>>>> our use case and help move this forward. >>>>>> >>>>>> Thanks, >>>>>> Xinyu >>>>>> >>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator >>>>>> [2]: https://github.com/linkedin/parseq >>>>>> [3]: >>>>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>>>>> >>>>>> >>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <k...@google.com> >>>>>> wrote: >>>>>> >>>>>>> I don't have any further suggestions, but want to call out how this >>>>>>> hits a lot of interesting points. >>>>>>> >>>>>>> The point about snapshotting is great. We have BEAM-2975 [1] and >>>>>>> BEAM-2980 [2] where we debated things a bit. I think the strongest case >>>>>>> is >>>>>>> for what you describe - it should be a snapshot. Perhaps they should >>>>>>> both >>>>>>> be closed as fixed... >>>>>>> >>>>>>> And you also bring up long blocking calls - we have also >>>>>>> deliberately decided that long synchronous blocking calls in >>>>>>> @ProcessElement can be embraced for simple programming and compensated >>>>>>> with >>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing poor >>>>>>> utilization). The alternative is a more future-istic API where the calls >>>>>>> can be explicitly asynchronous. We've had some interesting dev@ >>>>>>> list discussions about that, too. >>>>>>> >>>>>>> Is another possibility to perhaps have read() return a >>>>>>> ReadableState<Iterator> instead? We could, of course, have two methods >>>>>>> with >>>>>>> different names, one for iterator one for snapshot iterable. But >>>>>>> wouldn't >>>>>>> the Iterator also require a snapshot? Doesn't a native RocksDb iterator >>>>>>> require a snapshot to have well-defined contents? As you can tell, I >>>>>>> don't >>>>>>> know enough about RocksDb details to be sure of my suggestions. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980 >>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975 >>>>>>> >>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the >>>>>>>> question here, we discussed internally about releasing the underlying >>>>>>>> resources after consuming the whole iterator. This probably covers >>>>>>>> quite a >>>>>>>> lot of use cases. For some special cases that the user only consume >>>>>>>> part of >>>>>>>> the iterator, Luke and Kenn's suggestion about releasing after >>>>>>>> processElement() might work (I need to confirm about this with our use >>>>>>>> cases). So based on what we discussed so far, we might have a good way >>>>>>>> to >>>>>>>> automatically close an iterator for the store. >>>>>>>> >>>>>>>> There is another issue though: right now the state API returns an >>>>>>>> iterable for entries(), keys() and values(), and we can create iterator >>>>>>>> from it. From my understanding, the iterable holds a snapshot of the >>>>>>>> underlying store. In case of rocksDb, it's going to be a db.snapshot(). >>>>>>>> Then when can we release the snapshot? It's not like iterator where we >>>>>>>> can >>>>>>>> use some heuristics to automatically release it. The user can hold on >>>>>>>> to >>>>>>>> the iterable and create iterators throughout the whole >>>>>>>> processElement(). >>>>>>>> But if we only close the iterable after processElement(), I am quite >>>>>>>> concerned about the limitations this will bring. If the user is doing >>>>>>>> some >>>>>>>> remote call during the process, then the snapshot might be held for a >>>>>>>> long >>>>>>>> time before releasing, and might cause performance problems. And if the >>>>>>>> user happen to create multiple iterables, then there will be multiple >>>>>>>> snapshots loaded during process. Luke suggested being aggressive at >>>>>>>> closing >>>>>>>> the resources and recreating when needed again. But in this case it >>>>>>>> might >>>>>>>> not work since we won't be able to recreate the same snapshot given the >>>>>>>> store might have been updated (and creating rocksDb snapshot is not >>>>>>>> cheap >>>>>>>> too). I am running out of ideas other than exposing the iterator itself >>>>>>>> somehow (and add close() if needed?). Any further suggestions? >>>>>>>> >>>>>>>> @Kenn: btw, I have the same impl you posted earlier >>>>>>>> (CloseableIterator) in an internal interface. I wrapped it in some >>>>>>>> sort of >>>>>>>> StateResource in the fear that people might reject the proposal >>>>>>>> immediately >>>>>>>> after seeing the close() on the iterator. I guess our users are >>>>>>>> familiar >>>>>>>> with rocksDb state, it's pretty normal to close the iterator/snapshot >>>>>>>> after >>>>>>>> using it. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Xinyu >>>>>>>> >>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <lc...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> The iterator going out of scope is the idiomatic way that >>>>>>>>> resources are freed for Java developers (hence the weak/phantom >>>>>>>>> reference >>>>>>>>> suggestion). >>>>>>>>> Explicitly requiring users to deal with 'handles' (like file >>>>>>>>> streams) lead to leaked resources. >>>>>>>>> >>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <k...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Xinyu, >>>>>>>>>> >>>>>>>>>> I actually had first sketched out just what you wrote. But then I >>>>>>>>>> realized a few things: >>>>>>>>>> >>>>>>>>>> - usually an Iterable does not allocate resources, only its >>>>>>>>>> Iterators >>>>>>>>>> - if you consume the whole iterator, I hope the user would not >>>>>>>>>> have to do any extra work >>>>>>>>>> - you can also automatically free it up at the end of the call >>>>>>>>>> to @ProcessElement so that is easy too (but you might not want to) >>>>>>>>>> - so the main use is when the iterator terminates early and is >>>>>>>>>> not fully consumed and you can't wait to finish the method >>>>>>>>>> - the scoping in Java will force a bunch of uninitialized >>>>>>>>>> declarations outside the try-with-resources block, kind of a lot of >>>>>>>>>> boilerplate LOC >>>>>>>>>> >>>>>>>>>> One thing that is good about your proposal is that the iterable >>>>>>>>>> could have some transparent caches that all free up together. >>>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for drafting the details about the two approaches, Kenn. >>>>>>>>>>> Now I understand Luke's proposal better. The approach looks neat, >>>>>>>>>>> but the >>>>>>>>>>> uncertainty of *when* GC is going to kick in will make users' >>>>>>>>>>> life hard. If the user happens to configure a large JVM heap size, >>>>>>>>>>> and >>>>>>>>>>> since rocksDb uses off-heap memory, GC might start very late and >>>>>>>>>>> less >>>>>>>>>>> frequent than what we want. If we don't have a *definitive* way >>>>>>>>>>> to let user close the underlying resources, then there is no good >>>>>>>>>>> way to >>>>>>>>>>> handle such failures of a critical application in production. >>>>>>>>>>> >>>>>>>>>>> Having a close method in the iterator might be a little >>>>>>>>>>> unorthodox :). To some degree, this is actually a resource we are >>>>>>>>>>> holding >>>>>>>>>>> underneath, and I think it's pretty common to have close() for a >>>>>>>>>>> resource >>>>>>>>>>> in Java, e.g. BufferedReader and BufferedWriter. So I would imagine >>>>>>>>>>> that we >>>>>>>>>>> also define a resource for the state iterator and make the interface >>>>>>>>>>> implements *AutoCloseable*. Here is my sketch: >>>>>>>>>>> >>>>>>>>>>> // StateResource MUST be closed after use. >>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st = >>>>>>>>>>> bagState.iteratorResource()) { >>>>>>>>>>> Iterator<SomeType> iter = st.iterator(); >>>>>>>>>>> while (iter.hasNext() { >>>>>>>>>>> .. do stuff ... >>>>>>>>>>> } >>>>>>>>>>> } catch (Exception e) { >>>>>>>>>>> ... user code >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> The type/method name are just for illustrating here, so please >>>>>>>>>>> don't laugh at them. Please feel free to comment and let me know if >>>>>>>>>>> you >>>>>>>>>>> have thoughts about the programming patterns here. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Xinyu >>>>>>>>>>> >>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <k...@google.com >>>>>>>>>>> > wrote: >>>>>>>>>>> >>>>>>>>>>>> It is too soon to argue whether an API is complex or not. There >>>>>>>>>>>> has been no specific API proposed. >>>>>>>>>>>> >>>>>>>>>>>> I think the problem statement is real - you need to be able to >>>>>>>>>>>> read and write bigger-than-memory state. It seems we have multiple >>>>>>>>>>>> runners >>>>>>>>>>>> that don't support it, perhaps because of our API. You might be >>>>>>>>>>>> able to >>>>>>>>>>>> build something good enough with phantom references, but you might >>>>>>>>>>>> not. >>>>>>>>>>>> >>>>>>>>>>>> If I understand the idea, it might look something like this: >>>>>>>>>>>> >>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>> @StateId("foo") >>>>>>>>>>>> private final StateSpec<BagState<Whatever>> myBagSpec = >>>>>>>>>>>> ... >>>>>>>>>>>> >>>>>>>>>>>> @ProcessElement >>>>>>>>>>>> public void proc(@StateId("foo") BagState<Whatever> >>>>>>>>>>>> myBag, ...) { >>>>>>>>>>>> CloseableIterator<Whatever> iterator = >>>>>>>>>>>> myBag.get().iterator(); >>>>>>>>>>>> while(iterator.hasNext() && ... special condition ...) >>>>>>>>>>>> { >>>>>>>>>>>> ... do stuff ... >>>>>>>>>>>> } >>>>>>>>>>>> iterator.close(); >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> So it has no impact on users who don't choose to close() since >>>>>>>>>>>> they iterate with for ( : ) as usual. And a runner that has the >>>>>>>>>>>> 10x funding >>>>>>>>>>>> to try out a ReferenceQueue can be resilient to users that forget. >>>>>>>>>>>> On the >>>>>>>>>>>> other hand, I haven't seen this pattern much in the wild, so I >>>>>>>>>>>> think it is >>>>>>>>>>>> valuable to discuss other methods. >>>>>>>>>>>> >>>>>>>>>>>> While Luke's proposal is something like this if I understand >>>>>>>>>>>> his sketch (replacing WeakReference with PhantomReference seems to >>>>>>>>>>>> be what >>>>>>>>>>>> you really want): >>>>>>>>>>>> >>>>>>>>>>>> ... in RocksDb state implementation ... >>>>>>>>>>>> class RocksDbBagState { >>>>>>>>>>>> static ReferenceQueue rocksDbIteratorQueue = new >>>>>>>>>>>> ReferenceQueue(); >>>>>>>>>>>> >>>>>>>>>>>> class Iterator { >>>>>>>>>>>> PhantomReference<RocksDbJniIterator> cIter; >>>>>>>>>>>> .next() { >>>>>>>>>>>> return cIter.next(); >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> class Iterable { >>>>>>>>>>>> .iterator() { >>>>>>>>>>>> return new Iterator(new >>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, rocksDbIteratorQueue)); >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> ... on another thread ... >>>>>>>>>>>> while(true) { >>>>>>>>>>>> RocksDbIterator deadRef = (RocksDbIterator) >>>>>>>>>>>> rocksDbIteratorQueue.remove(); >>>>>>>>>>>> deadRef.close(); >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> When the iterator is GC'd, the phantom reference will pop onto >>>>>>>>>>>> the queue for being closed. This might not be too bad. You'll have >>>>>>>>>>>> delayed >>>>>>>>>>>> resource release, and potentially masked errors that are hard to >>>>>>>>>>>> debug. It >>>>>>>>>>>> is less error-prone than WeakReference, which is asking for >>>>>>>>>>>> trouble when >>>>>>>>>>>> objects are collected en masse. Anecdotally I have heard that >>>>>>>>>>>> performance >>>>>>>>>>>> of this kind of approach is poor, but I haven't experienced it >>>>>>>>>>>> myself and I >>>>>>>>>>>> can't find good data. >>>>>>>>>>>> >>>>>>>>>>>> Kenn >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu < >>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> If I understand correctly, using weak references will help >>>>>>>>>>>>> clean up the Java objects once GC kicks in. In case of kv-store >>>>>>>>>>>>> likes >>>>>>>>>>>>> rocksDb, the Java iterator is just a JNI interface to the >>>>>>>>>>>>> underlying C >>>>>>>>>>>>> iterator, so we need to explicitly invoke close to release the >>>>>>>>>>>>> in-memory >>>>>>>>>>>>> snapshot data, which can be large and accumulated quickly if it's >>>>>>>>>>>>> not >>>>>>>>>>>>> released when not in use. Maybe I am missing something as you >>>>>>>>>>>>> suggested >>>>>>>>>>>>> here, but looks to me using weak references might not help in >>>>>>>>>>>>> this case. >>>>>>>>>>>>> >>>>>>>>>>>>> I understand your concern, and I think you might misunderstood >>>>>>>>>>>>> what I meant. I am totally for working hard for best user >>>>>>>>>>>>> experience, and I >>>>>>>>>>>>> think the current API provides a good example of that. That's >>>>>>>>>>>>> also the >>>>>>>>>>>>> reason I am implementing a runner here. I am just proposing an >>>>>>>>>>>>> extra API to >>>>>>>>>>>>> expose an iterator that can be closed when not needed, that way >>>>>>>>>>>>> the users >>>>>>>>>>>>> can use this feature to iterate through large state that doesn't >>>>>>>>>>>>> fit into >>>>>>>>>>>>> memory. I believe this is also a pretty general use case and it's >>>>>>>>>>>>> better to >>>>>>>>>>>>> have support for it. I am actually arguing this will be a better >>>>>>>>>>>>> user >>>>>>>>>>>>> experience to add this extra API since more users can benefit >>>>>>>>>>>>> from it. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Xinyu >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <lc...@google.com >>>>>>>>>>>>> > wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I don't agree. I believe you can track the >>>>>>>>>>>>>> iterators/iterables that are created and freed by using weak >>>>>>>>>>>>>> references and >>>>>>>>>>>>>> reference queues (or other methods). Having a few people work >>>>>>>>>>>>>> 10x as hard >>>>>>>>>>>>>> to provide a good implementation is much better then having 100s >>>>>>>>>>>>>> or 1000s >>>>>>>>>>>>>> of users suffering through a more complicated API. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu < >>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory >>>>>>>>>>>>>>> footprint, but we still won't be able to release the underlying >>>>>>>>>>>>>>> resources. >>>>>>>>>>>>>>> We can add definitely heuristics to help release the resources >>>>>>>>>>>>>>> as you >>>>>>>>>>>>>>> mentioned, but there is no accurate way to track all the >>>>>>>>>>>>>>> iterators/iterables created and free them up once not needed. I >>>>>>>>>>>>>>> think while >>>>>>>>>>>>>>> the API is aimed at nice user experience, we should have the >>>>>>>>>>>>>>> option to let >>>>>>>>>>>>>>> users optimize their performance if they choose to. Do you >>>>>>>>>>>>>>> agree? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Users won't reliably close/release the resources and >>>>>>>>>>>>>>>> forcing them to will make the user experience worse. >>>>>>>>>>>>>>>> It will make a lot more sense to use a file format which >>>>>>>>>>>>>>>> allows random access and use a cache to load/evict blocks of >>>>>>>>>>>>>>>> the state from >>>>>>>>>>>>>>>> memory. >>>>>>>>>>>>>>>> If that is not possible, use an iterable which frees the >>>>>>>>>>>>>>>> resource after a certain amount of inactivity or uses weak >>>>>>>>>>>>>>>> references. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu < >>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, folks, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and >>>>>>>>>>>>>>>>> SetState in our Samza runner. We noticed that the state >>>>>>>>>>>>>>>>> returns the Java >>>>>>>>>>>>>>>>> Iterable for reading entries, keys, etc. For state backed by >>>>>>>>>>>>>>>>> file-based kv >>>>>>>>>>>>>>>>> store like rocksDb, we need to be able to let users >>>>>>>>>>>>>>>>> explicitly close >>>>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise we have >>>>>>>>>>>>>>>>> to load the >>>>>>>>>>>>>>>>> iterable into memory so we can safely close the underlying >>>>>>>>>>>>>>>>> rocksDb >>>>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But this >>>>>>>>>>>>>>>>> won't work for states that don't fit into memory. I chatted >>>>>>>>>>>>>>>>> with Kenn and >>>>>>>>>>>>>>>>> he also agrees we need this capability to avoid bulk >>>>>>>>>>>>>>>>> read/write. This >>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we can >>>>>>>>>>>>>>>>> add the support >>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any >>>>>>>>>>>>>>>>> feedback is highly >>>>>>>>>>>>>>>>> appreciated. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>> >>