I am happy to chat about it over hangout or slack too. Let's talk offline to set it up if needed.
Thanks, Xinyu On Tue, May 15, 2018 at 10:51 AM, Xinyu Liu <xinyuliu...@gmail.com> wrote: > For Samza runner, it's always processes key+window pairs serially. To > answer Luke's question: > > - Why Samza needs to use a snapshot in the first place and should be able > to read data from RocksDb directly? > I believe that the point of adding the readIterator() is to allow us to > read from RocksDb directly without bulk loading to memory. In this case > Samza will create the RocksDb iterator and expose it through the interface > to the user. There is no need to create an explicit snapshot for this. For > read() we need to load content into memory so we can release the rocksDb > resources immediately and the user can iterate many times later. The reason > I mentioned about snapshot is that if we don't load into memory for read(), > we need to maintain rocskDb snapshot under the hood so the user can iterate > the same content. This will cause memory and performance issues for our > users. > > For the commit question, do you mean whether we commit in the middle of > processing a bundle? That's not the case in Samza. Samza will wait for the > bundle process complete, flush the output and state modifications and then > checkpointing. > > Thanks, > Xinyu > > > > On Tue, May 15, 2018 at 10:34 AM, Kenneth Knowles <k...@google.com> wrote: > >> >> >> On Tue, May 15, 2018 at 9:36 AM Lukasz Cwik <lc...@google.com> wrote: >> >>> If it always processes key+window pairs serially, then I'm not sure why >>> Samza needs to use a snapshot in the first place and should be able to read >>> data from RocksDb directly. >>> >>> The other point of confusion is around how is a readIterator() any >>> different then returning a wrapper that just invokes read().iterator() on a >>> BagState. If that is the case then this is a trivial change which doesn't >>> need to change the portability design. >>> >> >> I think this is a key point. Just calling out the obvious to talk about >> it: >> >> - read() returns an iterable - a value with fixed contents that can be >> iterated many times >> - readIterator() would return an iterator - a value with fixed contents >> that can be iterated only once >> >> In both cases, later writes should not affect the observed contents. So >> they both need some kind of snapshot for their lifetime. With >> read().iterator() the intermediate iterable is immediately garbage, but >> this is the reference queue based solution again, with some ref tracking >> for iterables/iterators. >> >> I now see what you mean about committing within a bundle; we definitely >> need to atomically commit the completion of upstream elements, output >> elements, and state modifications. What are your thoughts on this Xinyu? >> >> Happy to join a quick hangout. >> >> Kenn >> >> >> The nuance is likely that the interface between what a Runner implements >>> to provide support for user state may give itself more visibility into what >>> is going on then what the portability framework can provide as is. >>> >>> On Tue, May 15, 2018 at 8:57 AM Kenneth Knowles <k...@google.com> wrote: >>> >>>> OK, got it. But what consistency are you referring to? I was trying to >>>> point out that there's nothing but straight-line program order consistency. >>>> There's only one actor doing all the reads and all the writes. >>>> >>>> Kenn >>>> >>>> On Tue, May 15, 2018 at 8:39 AM Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> I misspoke when I said portability semantics and should have said >>>>> portability design/implementation. This is why I had a follow-up e-mail >>>>> and >>>>> clarified that I'm confused on: >>>>> * I don't understand how you would want close to change the semantics >>>>> of a user state specification and how it affects the lifetime of user >>>>> state? >>>>> ** Does it represent committing information within a bundle? >>>>> ** Does it mean that user state can ignore the replayable and >>>>> consistent semantics for a lifetime of a bundle semantics? >>>>> >>>>> I'm trying to tie back what does a `ReadableState<Iterator> >>>>> readIterator()` means for Runner authors and how it solves the >>>>> memory/close() problem for Samza. Based upon https://s.apache.org/beam >>>>> -state, reading and writing of state must be consistent. Does this >>>>> mean that Samza must use snapshots for the lifetime of a bundle? If so, I >>>>> don't see how adding a `ReadableState<Iterator> readIterator()` allows >>>>> Samza to ignore the consistency requirement and be allowed to free >>>>> snapshots. >>>>> >>>>> It might be worthwhile to setup a three way hangouts call to help me >>>>> as I don't have the same level of context which can be shared back to this >>>>> thread. Xinyu / Kenn, how about we setup a time using Slack? >>>>> >>>>> On Mon, May 14, 2018 at 8:36 PM Kenneth Knowles <k...@google.com> >>>>> wrote: >>>>> >>>>>> I feel like this discussion is kind of far from the primary >>>>>> intention. The point of ParDo(stateful DoFn) is to enable naive >>>>>> single-threaded code in a style intuitive to a beginning imperative >>>>>> programmer. So: >>>>>> >>>>>> - the return value of read() should act like an immutable value >>>>>> - if there is a read after a write, that read should reflect the >>>>>> changes written, because there's a trivial happens-before relationship >>>>>> induced by program order >>>>>> - there are no non-trivial happens-before relationships >>>>>> - a write after a read should not affect the value read before >>>>>> >>>>>> From that starting point, the limitations on expressiveness (single >>>>>> threaded per-key-and-window-and-step) give rise to embarrassingly >>>>>> parallel >>>>>> computation and GC. >>>>>> >>>>>> So when you say "portability semantics" it makes me concerned. The >>>>>> word "semantics" refers to the mapping between what a user writes and >>>>>> what >>>>>> it means. Primarily, that means the conceptual translation between a >>>>>> primitive PTransform and the corresponding PCollection-to-PCollection >>>>>> operation. It is true that portability complicates things because a >>>>>> user's >>>>>> code can only be given meaning relative to an SDK harness. But the >>>>>> end-user >>>>>> intention is unchanged. If we had time and resources to give the Fn API a >>>>>> solid spec, it should be such that the SDK harness has little choice but >>>>>> to >>>>>> implement the primitives as intended. >>>>>> >>>>>> In other words, it is the job of https://s.apache.org/beam-fn-s >>>>>> tate-api-and-bundle-processing* to implement >>>>>> https://s.apache.org/beam-state. The discussion of adding >>>>>> `ReadableState<Iterator> readIterator()` to BagState seems consistent >>>>>> with >>>>>> the latter. >>>>>> >>>>>> Kenn >>>>>> >>>>>> *IIUC you've chosen to use the same underlying proto service for side >>>>>> inputs and by-reference values. That's an implementation detail that I >>>>>> have >>>>>> no particular opinion about except that if it complicates implementing >>>>>> the >>>>>> primary motivator for state, it may not be a great fit. >>>>>> >>>>>> >>>>>> >>>>>> On Mon, May 14, 2018 at 7:35 PM Kenneth Knowles <k...@google.com> >>>>>> wrote: >>>>>> >>>>>>> I don't follow why allowing freeing resources would be counter to >>>>>>> the spec. I don't really know what you mean by consistent for a bundle. >>>>>>> State, in the sense of the user-facing per-key-and-window state API, is >>>>>>> single threaded and scoped to a single DoFn. There's no one else who can >>>>>>> write the state. If a BagState is read and written and read again, the >>>>>>> user-facing logic should be unaware of the resources and not have any >>>>>>> logic >>>>>>> to deal with consistency. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <lc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hmm, some of the problem I'm dealing with is: >>>>>>>> * I don't understand how you would want close to change the >>>>>>>> semantics of a user state specification and how it affects the >>>>>>>> lifetime of >>>>>>>> user state? >>>>>>>> ** Does it represent committing information within a bundle? >>>>>>>> ** Does it mean that user state can ignore the replayable and >>>>>>>> consistent semantics for a lifetime of a bundle semantics? >>>>>>>> * I do understand that close semantics may make it easier for Samza >>>>>>>> runner to support state that is greater then memory, but what does it >>>>>>>> provide to users and how would they benefit (like what new scenarios >>>>>>>> would >>>>>>>> it support)? >>>>>>>> * I don't understand in RocksDb why you need a snapshot in the >>>>>>>> first place, and whether RocksDb can support seeking inside or outside >>>>>>>> a >>>>>>>> snapshot relatively efficiently? >>>>>>>> >>>>>>>> There seems to be some context lost initially with a discussion >>>>>>>> that you and Kenn had, is there a copy of that which could be shared? >>>>>>>> May >>>>>>>> help me get up to speed on to the problem that is being solved. >>>>>>>> >>>>>>>> >>>>>>>> On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I will take a look at the docs to understand the problem better. A >>>>>>>>> minor comment to 2) is that I don't intend to change the existing >>>>>>>>> iterable >>>>>>>>> API. I plan to implement it similar to Flink, loading the data into >>>>>>>>> memory >>>>>>>>> and closing the underlying snapshot after that. So the changes should >>>>>>>>> be >>>>>>>>> backward compatible. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Xinyu >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <lc...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I believe adding support for a state spec to be 'closed' or >>>>>>>>>> 'freed' is counter to the requirement of a state spec being >>>>>>>>>> consistent for >>>>>>>>>> the lifetime of a bundle, are we willing to change this requirement >>>>>>>>>> for the >>>>>>>>>> lifetime of a bundle or say that runners can arbitrary say that a >>>>>>>>>> StateSpec >>>>>>>>>> can't be accessed anymore? >>>>>>>>>> >>>>>>>>>> If not, I'm having trouble of thinking of a good way on how to >>>>>>>>>> integrate the 'close/free' API with portability semantics because: >>>>>>>>>> 1) Runners do control bundle size but other then the trivial >>>>>>>>>> cases where a runner specifically chooses: >>>>>>>>>> a) to only process a single element at a time (negating the >>>>>>>>>> need for a free/close() method). >>>>>>>>>> b) optimistically process elements and if you run out of memory >>>>>>>>>> kill the bundle and try again with a smaller bundle. >>>>>>>>>> 2) Adding an API to provide Iterators doesn't mean that users >>>>>>>>>> can't still use iterable which can't be force closed. (Dropping >>>>>>>>>> Iterable in >>>>>>>>>> exchange for Iterator in the API is backwards incompatible so likely >>>>>>>>>> to be >>>>>>>>>> deferred till the next major version of Apache Beam). >>>>>>>>>> 3) SDKs are able to process as many elements as they want in >>>>>>>>>> parallel, nothing requires them to execute elements serially >>>>>>>>>> throughout the >>>>>>>>>> pipeline graph. >>>>>>>>>> 4) The runner has limited information into what an SDK is doing. >>>>>>>>>> The SDK provides a lower bound on how many elements it has processed >>>>>>>>>> but >>>>>>>>>> SDKs aren't required to implement this meaning that they could >>>>>>>>>> technically >>>>>>>>>> be processing everything in random order after they have seen all >>>>>>>>>> input for >>>>>>>>>> a bundle. >>>>>>>>>> >>>>>>>>>> We'll need to work through the scenarios, Xinyu it would be >>>>>>>>>> useful for you to take a look at these two docs for context into the >>>>>>>>>> problem space: >>>>>>>>>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to >>>>>>>>>> process a bundle) >>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing >>>>>>>>>> (How to access side inputs, access remote references, and support >>>>>>>>>> user >>>>>>>>>> state) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <k...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <lc...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Before you go on and update the user facing API, we should >>>>>>>>>>>> discuss the last point I made since the change your making will >>>>>>>>>>>> have >>>>>>>>>>>> limited usability since the portability effort won't realistically >>>>>>>>>>>> allow >>>>>>>>>>>> you to see such low level things like when processElement finished >>>>>>>>>>>> and >>>>>>>>>>>> supporting user state will be modeled using the following three >>>>>>>>>>>> operations: >>>>>>>>>>>> read (with continuation tokens), append (blind write), clear. It >>>>>>>>>>>> may be >>>>>>>>>>>> moot to discuss how to integrate Samza into the existing framework >>>>>>>>>>>> and >>>>>>>>>>>> should work towards being a portability based runner. There are >>>>>>>>>>>> more >>>>>>>>>>>> details here about supporting user state during pipeline execution: >>>>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, >>>>>>>>>>>> top level docs about portability are >>>>>>>>>>>> https://s.apache.org/beam-runner-api and >>>>>>>>>>>> https://s.apache.org/beam-fn-api. >>>>>>>>>>>> >>>>>>>>>>>> Do we really want to make a change to the user facing API for >>>>>>>>>>>> user state in the Java SDK when that code path will be phased out >>>>>>>>>>>> in >>>>>>>>>>>> exchange for using the portability APIs? >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> To be clear, the user API will remain the same, but the runner's >>>>>>>>>>> implementation path will be phased out. So should we expand this >>>>>>>>>>> discussion >>>>>>>>>>> to how the portability APIs enable the SDK and runner to >>>>>>>>>>> collaborate to >>>>>>>>>>> achieve this use case? It seems like the interaction you need is >>>>>>>>>>> that the >>>>>>>>>>> runner can tell that the SDK can closed the connection on the >>>>>>>>>>> read(), and >>>>>>>>>>> the SDK needs to do so promptly, right? >>>>>>>>>>> >>>>>>>>>>> Kenn >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu < >>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> We discussed internally about the proposed approaches. Seems >>>>>>>>>>>>> if the State API can also expose another method to return a >>>>>>>>>>>>> ReadableState<Iterator>, >>>>>>>>>>>>> it will cover our cases of iterating over a bigger-then-memory >>>>>>>>>>>>> state, and >>>>>>>>>>>>> closing the underlying rocksDb snapshot immediately after the >>>>>>>>>>>>> iterator is >>>>>>>>>>>>> fully consumed (most cases), and also safely close the rest of the >>>>>>>>>>>>> iterators after proessElement (minor cases). If this sounds good >>>>>>>>>>>>> to you >>>>>>>>>>>>> guys, I am going to create a JIRA ticket for it and open a PR for >>>>>>>>>>>>> it. >>>>>>>>>>>>> Thanks a lot for the discussions here. >>>>>>>>>>>>> >>>>>>>>>>>>> The discussions about Async processing is very interesting, >>>>>>>>>>>>> and I think it's definitely worth its own thread. I believe we do >>>>>>>>>>>>> need the >>>>>>>>>>>>> support from the Beam API/model so the users can take advantage >>>>>>>>>>>>> of it >>>>>>>>>>>>> (Besides Samza, Flink also has an async operator that helps a lot >>>>>>>>>>>>> in the >>>>>>>>>>>>> Alibaba's use cases). Yes, it will add complexity to the >>>>>>>>>>>>> underlying >>>>>>>>>>>>> framework, but it's going to be great for the users to do remote >>>>>>>>>>>>> IO. In >>>>>>>>>>>>> practice we found it actually avoids thread+locks issues, as Kenn >>>>>>>>>>>>> mentioned >>>>>>>>>>>>> above. I am not sure whether this feature can be runner-specific >>>>>>>>>>>>> support >>>>>>>>>>>>> thing. I will probably create another email thread for this >>>>>>>>>>>>> discussion in >>>>>>>>>>>>> the future and hopefully I can move this forward. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Xinyu >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles < >>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <lc...@google.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Users typically want to do that async operation and then >>>>>>>>>>>>>>> produce output with it. Adding asynchronous execution is >>>>>>>>>>>>>>> difficult within >>>>>>>>>>>>>>> the framework because a lot of code is currently not needed to >>>>>>>>>>>>>>> be thread >>>>>>>>>>>>>>> safe and writing code to be fast and handle asynchronous >>>>>>>>>>>>>>> execution is quite >>>>>>>>>>>>>>> difficult. Adding async operations typically leads to code with >>>>>>>>>>>>>>> too many >>>>>>>>>>>>>>> locks/synchronization blocks. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Just a small point here, as it is very unusual to use locks >>>>>>>>>>>>>> with futures. The essential innovation of futures is that it is >>>>>>>>>>>>>> a new way >>>>>>>>>>>>>> to program that avoids threads+locks style. In part, you get >>>>>>>>>>>>>> this for free >>>>>>>>>>>>>> from functional programming style, and on the other hand you >>>>>>>>>>>>>> reify >>>>>>>>>>>>>> asynchronous side effects as data dependencies. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Note that with the portability effort, the Runner won't have >>>>>>>>>>>>>>> visibility into such low level things like when an object is >>>>>>>>>>>>>>> garbage >>>>>>>>>>>>>>> collected and supporting user state will be modeled using the >>>>>>>>>>>>>>> following >>>>>>>>>>>>>>> three operations: read (with continuation tokens), append >>>>>>>>>>>>>>> (blind write), >>>>>>>>>>>>>>> clear. It may be moot to discuss how to integrate Samza into >>>>>>>>>>>>>>> the existing >>>>>>>>>>>>>>> framework and should work towards being a portability based >>>>>>>>>>>>>>> runner. There >>>>>>>>>>>>>>> are more details here about supporting user state during >>>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>>> execution: https://s.apache.org/beam-fn-state-api-and-bundle >>>>>>>>>>>>>>> -processing, top level docs about portability are >>>>>>>>>>>>>>> https://s.apache.org/beam-runner-api and >>>>>>>>>>>>>>> https://s.apache.org/beam-fn-api. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <re...@google.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> At least one API that has been discussed in the past, is to >>>>>>>>>>>>>>>> use Java 8 CompletionStage. e.g. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>> public void process(@Element CompletionStage<InputT> >>>>>>>>>>>>>>>> element, ...) { >>>>>>>>>>>>>>>> element.thenApply(...) >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The framework will automatically create the >>>>>>>>>>>>>>>> CompletionStage, and the process method can specify a pipeline >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>> asynchronous operations to perform on the element. When all of >>>>>>>>>>>>>>>> them are >>>>>>>>>>>>>>>> done, the element will be marked as successfully processed. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Reuven >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu < >>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for all the pointers. I looked though the >>>>>>>>>>>>>>>>> discussion over BEAM-2975 and BEAM-2980 about having >>>>>>>>>>>>>>>>> snapshot or live views of iterable, and the current semantics >>>>>>>>>>>>>>>>> makes a lot >>>>>>>>>>>>>>>>> of sense to me. For your question: it does not require an >>>>>>>>>>>>>>>>> explicit >>>>>>>>>>>>>>>>> snapshot when we create RocksDb iterator directly. The >>>>>>>>>>>>>>>>> iterator will read >>>>>>>>>>>>>>>>> from an implicit snapshot as of the time the iterator is >>>>>>>>>>>>>>>>> created [1], and >>>>>>>>>>>>>>>>> the snapshot will be released after the iterator is closed. >>>>>>>>>>>>>>>>> If we can have >>>>>>>>>>>>>>>>> another method to return ReadableState<Iterator>, we might be >>>>>>>>>>>>>>>>> able to apply >>>>>>>>>>>>>>>>> the auto-closing approaches as we discussed and solve the >>>>>>>>>>>>>>>>> problem here :). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It's very interesting that you bring up the discussion >>>>>>>>>>>>>>>>> about async API! Async IO has been widely adopted here among >>>>>>>>>>>>>>>>> our users: >>>>>>>>>>>>>>>>> they use netty for async calls with library named ParSeq [2] >>>>>>>>>>>>>>>>> to help manage >>>>>>>>>>>>>>>>> the calls. Samza provides a primitive callback style API [3], >>>>>>>>>>>>>>>>> in which the >>>>>>>>>>>>>>>>> user will invoke the callback after the remote calls are >>>>>>>>>>>>>>>>> complete. >>>>>>>>>>>>>>>>> Currently in a Samza job our users use this API with the >>>>>>>>>>>>>>>>> ParSeq lib for >>>>>>>>>>>>>>>>> remote IO. Seems we might have to do blocking calls (thus the >>>>>>>>>>>>>>>>> poor resource >>>>>>>>>>>>>>>>> utilization you mentioned) when using Beam API for now. It'll >>>>>>>>>>>>>>>>> be great if >>>>>>>>>>>>>>>>> you can send a few more details about the discussion about >>>>>>>>>>>>>>>>> async API. I >>>>>>>>>>>>>>>>> would like to add our use case and help move this forward. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator >>>>>>>>>>>>>>>>> [2]: https://github.com/linkedin/parseq >>>>>>>>>>>>>>>>> [3]: https://samza.apache.org/ >>>>>>>>>>>>>>>>> learn/documentation/0.14/api/j >>>>>>>>>>>>>>>>> avadocs/org/apache/samza/task/AsyncStreamTask.html >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles < >>>>>>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I don't have any further suggestions, but want to call >>>>>>>>>>>>>>>>>> out how this hits a lot of interesting points. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975 >>>>>>>>>>>>>>>>>> [1] and BEAM-2980 [2] where we debated things a bit. I think >>>>>>>>>>>>>>>>>> the strongest >>>>>>>>>>>>>>>>>> case is for what you describe - it should be a snapshot. >>>>>>>>>>>>>>>>>> Perhaps they >>>>>>>>>>>>>>>>>> should both be closed as fixed... >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> And you also bring up long blocking calls - we have also >>>>>>>>>>>>>>>>>> deliberately decided that long synchronous blocking calls in >>>>>>>>>>>>>>>>>> @ProcessElement can be embraced for simple programming and >>>>>>>>>>>>>>>>>> compensated with >>>>>>>>>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing >>>>>>>>>>>>>>>>>> poor >>>>>>>>>>>>>>>>>> utilization). The alternative is a more future-istic API >>>>>>>>>>>>>>>>>> where the calls >>>>>>>>>>>>>>>>>> can be explicitly asynchronous. We've had some interesting >>>>>>>>>>>>>>>>>> dev@ >>>>>>>>>>>>>>>>>> list discussions about that, too. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Is another possibility to perhaps have read() return a >>>>>>>>>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have >>>>>>>>>>>>>>>>>> two methods with >>>>>>>>>>>>>>>>>> different names, one for iterator one for snapshot iterable. >>>>>>>>>>>>>>>>>> But wouldn't >>>>>>>>>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native >>>>>>>>>>>>>>>>>> RocksDb iterator >>>>>>>>>>>>>>>>>> require a snapshot to have well-defined contents? As you can >>>>>>>>>>>>>>>>>> tell, I don't >>>>>>>>>>>>>>>>>> know enough about RocksDb details to be sure of my >>>>>>>>>>>>>>>>>> suggestions. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980 >>>>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu < >>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I >>>>>>>>>>>>>>>>>>> posted the question here, we discussed internally about >>>>>>>>>>>>>>>>>>> releasing the >>>>>>>>>>>>>>>>>>> underlying resources after consuming the whole iterator. >>>>>>>>>>>>>>>>>>> This probably >>>>>>>>>>>>>>>>>>> covers quite a lot of use cases. For some special cases >>>>>>>>>>>>>>>>>>> that the user only >>>>>>>>>>>>>>>>>>> consume part of the iterator, Luke and Kenn's suggestion >>>>>>>>>>>>>>>>>>> about releasing >>>>>>>>>>>>>>>>>>> after processElement() might work (I need to confirm about >>>>>>>>>>>>>>>>>>> this with our >>>>>>>>>>>>>>>>>>> use cases). So based on what we discussed so far, we might >>>>>>>>>>>>>>>>>>> have a good way >>>>>>>>>>>>>>>>>>> to automatically close an iterator for the store. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> There is another issue though: right now the state API >>>>>>>>>>>>>>>>>>> returns an iterable for entries(), keys() and values(), and >>>>>>>>>>>>>>>>>>> we can create >>>>>>>>>>>>>>>>>>> iterator from it. From my understanding, the iterable holds >>>>>>>>>>>>>>>>>>> a snapshot of >>>>>>>>>>>>>>>>>>> the underlying store. In case of rocksDb, it's going to be >>>>>>>>>>>>>>>>>>> a db.snapshot(). >>>>>>>>>>>>>>>>>>> Then when can we release the snapshot? It's not like >>>>>>>>>>>>>>>>>>> iterator where we can >>>>>>>>>>>>>>>>>>> use some heuristics to automatically release it. The user >>>>>>>>>>>>>>>>>>> can hold on to >>>>>>>>>>>>>>>>>>> the iterable and create iterators throughout the whole >>>>>>>>>>>>>>>>>>> processElement(). >>>>>>>>>>>>>>>>>>> But if we only close the iterable after processElement(), I >>>>>>>>>>>>>>>>>>> am quite >>>>>>>>>>>>>>>>>>> concerned about the limitations this will bring. If the >>>>>>>>>>>>>>>>>>> user is doing some >>>>>>>>>>>>>>>>>>> remote call during the process, then the snapshot might be >>>>>>>>>>>>>>>>>>> held for a long >>>>>>>>>>>>>>>>>>> time before releasing, and might cause performance >>>>>>>>>>>>>>>>>>> problems. And if the >>>>>>>>>>>>>>>>>>> user happen to create multiple iterables, then there will >>>>>>>>>>>>>>>>>>> be multiple >>>>>>>>>>>>>>>>>>> snapshots loaded during process. Luke suggested being >>>>>>>>>>>>>>>>>>> aggressive at closing >>>>>>>>>>>>>>>>>>> the resources and recreating when needed again. But in this >>>>>>>>>>>>>>>>>>> case it might >>>>>>>>>>>>>>>>>>> not work since we won't be able to recreate the same >>>>>>>>>>>>>>>>>>> snapshot given the >>>>>>>>>>>>>>>>>>> store might have been updated (and creating rocksDb >>>>>>>>>>>>>>>>>>> snapshot is not cheap >>>>>>>>>>>>>>>>>>> too). I am running out of ideas other than exposing the >>>>>>>>>>>>>>>>>>> iterator itself >>>>>>>>>>>>>>>>>>> somehow (and add close() if needed?). Any further >>>>>>>>>>>>>>>>>>> suggestions? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier >>>>>>>>>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it >>>>>>>>>>>>>>>>>>> in some sort of >>>>>>>>>>>>>>>>>>> StateResource in the fear that people might reject the >>>>>>>>>>>>>>>>>>> proposal immediately >>>>>>>>>>>>>>>>>>> after seeing the close() on the iterator. I guess our users >>>>>>>>>>>>>>>>>>> are familiar >>>>>>>>>>>>>>>>>>> with rocksDb state, it's pretty normal to close the >>>>>>>>>>>>>>>>>>> iterator/snapshot after >>>>>>>>>>>>>>>>>>> using it. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik < >>>>>>>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The iterator going out of scope is the idiomatic way >>>>>>>>>>>>>>>>>>>> that resources are freed for Java developers (hence the >>>>>>>>>>>>>>>>>>>> weak/phantom >>>>>>>>>>>>>>>>>>>> reference suggestion). >>>>>>>>>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like >>>>>>>>>>>>>>>>>>>> file streams) lead to leaked resources. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles < >>>>>>>>>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks Xinyu, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I actually had first sketched out just what you wrote. >>>>>>>>>>>>>>>>>>>>> But then I realized a few things: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - usually an Iterable does not allocate resources, >>>>>>>>>>>>>>>>>>>>> only its Iterators >>>>>>>>>>>>>>>>>>>>> - if you consume the whole iterator, I hope the user >>>>>>>>>>>>>>>>>>>>> would not have to do any extra work >>>>>>>>>>>>>>>>>>>>> - you can also automatically free it up at the end of >>>>>>>>>>>>>>>>>>>>> the call to @ProcessElement so that is easy too (but you >>>>>>>>>>>>>>>>>>>>> might not want to) >>>>>>>>>>>>>>>>>>>>> - so the main use is when the iterator terminates >>>>>>>>>>>>>>>>>>>>> early and is not fully consumed and you can't wait to >>>>>>>>>>>>>>>>>>>>> finish the method >>>>>>>>>>>>>>>>>>>>> - the scoping in Java will force a bunch of >>>>>>>>>>>>>>>>>>>>> uninitialized declarations outside the try-with-resources >>>>>>>>>>>>>>>>>>>>> block, kind of a >>>>>>>>>>>>>>>>>>>>> lot of boilerplate LOC >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> One thing that is good about your proposal is that the >>>>>>>>>>>>>>>>>>>>> iterable could have some transparent caches that all free >>>>>>>>>>>>>>>>>>>>> up together. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu < >>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for drafting the details about the two >>>>>>>>>>>>>>>>>>>>>> approaches, Kenn. Now I understand Luke's proposal >>>>>>>>>>>>>>>>>>>>>> better. The approach >>>>>>>>>>>>>>>>>>>>>> looks neat, but the uncertainty of *when* GC is >>>>>>>>>>>>>>>>>>>>>> going to kick in will make users' life hard. If the user >>>>>>>>>>>>>>>>>>>>>> happens to >>>>>>>>>>>>>>>>>>>>>> configure a large JVM heap size, and since rocksDb uses >>>>>>>>>>>>>>>>>>>>>> off-heap memory, GC >>>>>>>>>>>>>>>>>>>>>> might start very late and less frequent than what we >>>>>>>>>>>>>>>>>>>>>> want. If we don't have >>>>>>>>>>>>>>>>>>>>>> a *definitive* way to let user close the underlying >>>>>>>>>>>>>>>>>>>>>> resources, then there is no good way to handle such >>>>>>>>>>>>>>>>>>>>>> failures of a critical >>>>>>>>>>>>>>>>>>>>>> application in production. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Having a close method in the iterator might be a >>>>>>>>>>>>>>>>>>>>>> little unorthodox :). To some degree, this is actually a >>>>>>>>>>>>>>>>>>>>>> resource we are >>>>>>>>>>>>>>>>>>>>>> holding underneath, and I think it's pretty common to >>>>>>>>>>>>>>>>>>>>>> have close() for a >>>>>>>>>>>>>>>>>>>>>> resource in Java, e.g. BufferedReader and >>>>>>>>>>>>>>>>>>>>>> BufferedWriter. So I would >>>>>>>>>>>>>>>>>>>>>> imagine that we also define a resource for the state >>>>>>>>>>>>>>>>>>>>>> iterator and make the >>>>>>>>>>>>>>>>>>>>>> interface implements *AutoCloseable*. Here is my >>>>>>>>>>>>>>>>>>>>>> sketch: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> // StateResource MUST be closed after use. >>>>>>>>>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st = >>>>>>>>>>>>>>>>>>>>>> bagState.iteratorResource()) { >>>>>>>>>>>>>>>>>>>>>> Iterator<SomeType> iter = st.iterator(); >>>>>>>>>>>>>>>>>>>>>> while (iter.hasNext() { >>>>>>>>>>>>>>>>>>>>>> .. do stuff ... >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> } catch (Exception e) { >>>>>>>>>>>>>>>>>>>>>> ... user code >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> The type/method name are just for illustrating here, >>>>>>>>>>>>>>>>>>>>>> so please don't laugh at them. Please feel free to >>>>>>>>>>>>>>>>>>>>>> comment and let me know >>>>>>>>>>>>>>>>>>>>>> if you have thoughts about the programming patterns here. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles < >>>>>>>>>>>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or >>>>>>>>>>>>>>>>>>>>>>> not. There has been no specific API proposed. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I think the problem statement is real - you need to >>>>>>>>>>>>>>>>>>>>>>> be able to read and write bigger-than-memory state. It >>>>>>>>>>>>>>>>>>>>>>> seems we have >>>>>>>>>>>>>>>>>>>>>>> multiple runners that don't support it, perhaps because >>>>>>>>>>>>>>>>>>>>>>> of our API. You >>>>>>>>>>>>>>>>>>>>>>> might be able to build something good enough with >>>>>>>>>>>>>>>>>>>>>>> phantom references, but >>>>>>>>>>>>>>>>>>>>>>> you might not. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> If I understand the idea, it might look something >>>>>>>>>>>>>>>>>>>>>>> like this: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>>>>>>>>>>>> @StateId("foo") >>>>>>>>>>>>>>>>>>>>>>> private final StateSpec<BagState<Whatever>> >>>>>>>>>>>>>>>>>>>>>>> myBagSpec = ... >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>>>>>>>>> public void proc(@StateId("foo") >>>>>>>>>>>>>>>>>>>>>>> BagState<Whatever> myBag, ...) { >>>>>>>>>>>>>>>>>>>>>>> CloseableIterator<Whatever> iterator = >>>>>>>>>>>>>>>>>>>>>>> myBag.get().iterator(); >>>>>>>>>>>>>>>>>>>>>>> while(iterator.hasNext() && ... special >>>>>>>>>>>>>>>>>>>>>>> condition ...) { >>>>>>>>>>>>>>>>>>>>>>> ... do stuff ... >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> iterator.close(); >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> So it has no impact on users who don't choose to >>>>>>>>>>>>>>>>>>>>>>> close() since they iterate with for ( : ) as usual. And >>>>>>>>>>>>>>>>>>>>>>> a runner that has >>>>>>>>>>>>>>>>>>>>>>> the 10x funding to try out a ReferenceQueue can be >>>>>>>>>>>>>>>>>>>>>>> resilient to users that >>>>>>>>>>>>>>>>>>>>>>> forget. On the other hand, I haven't seen this pattern >>>>>>>>>>>>>>>>>>>>>>> much in the wild, so >>>>>>>>>>>>>>>>>>>>>>> I think it is valuable to discuss other methods. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> While Luke's proposal is something like this if I >>>>>>>>>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with >>>>>>>>>>>>>>>>>>>>>>> PhantomReference seems >>>>>>>>>>>>>>>>>>>>>>> to be what you really want): >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ... in RocksDb state implementation ... >>>>>>>>>>>>>>>>>>>>>>> class RocksDbBagState { >>>>>>>>>>>>>>>>>>>>>>> static ReferenceQueue rocksDbIteratorQueue = >>>>>>>>>>>>>>>>>>>>>>> new ReferenceQueue(); >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> class Iterator { >>>>>>>>>>>>>>>>>>>>>>> PhantomReference<RocksDbJniIterator> cIter; >>>>>>>>>>>>>>>>>>>>>>> .next() { >>>>>>>>>>>>>>>>>>>>>>> return cIter.next(); >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> class Iterable { >>>>>>>>>>>>>>>>>>>>>>> .iterator() { >>>>>>>>>>>>>>>>>>>>>>> return new Iterator(new >>>>>>>>>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, >>>>>>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue)); >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ... on another thread ... >>>>>>>>>>>>>>>>>>>>>>> while(true) { >>>>>>>>>>>>>>>>>>>>>>> RocksDbIterator deadRef = (RocksDbIterator) >>>>>>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove(); >>>>>>>>>>>>>>>>>>>>>>> deadRef.close(); >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference >>>>>>>>>>>>>>>>>>>>>>> will pop onto the queue for being closed. This might >>>>>>>>>>>>>>>>>>>>>>> not be too bad. You'll >>>>>>>>>>>>>>>>>>>>>>> have delayed resource release, and potentially masked >>>>>>>>>>>>>>>>>>>>>>> errors that are hard >>>>>>>>>>>>>>>>>>>>>>> to debug. It is less error-prone than WeakReference, >>>>>>>>>>>>>>>>>>>>>>> which is asking for >>>>>>>>>>>>>>>>>>>>>>> trouble when objects are collected en masse. >>>>>>>>>>>>>>>>>>>>>>> Anecdotally I have heard that >>>>>>>>>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I >>>>>>>>>>>>>>>>>>>>>>> haven't experienced it >>>>>>>>>>>>>>>>>>>>>>> myself and I can't find good data. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> If I understand correctly, using weak references >>>>>>>>>>>>>>>>>>>>>>>> will help clean up the Java objects once GC kicks in. >>>>>>>>>>>>>>>>>>>>>>>> In case of kv-store >>>>>>>>>>>>>>>>>>>>>>>> likes rocksDb, the Java iterator is just a JNI >>>>>>>>>>>>>>>>>>>>>>>> interface to the underlying >>>>>>>>>>>>>>>>>>>>>>>> C iterator, so we need to explicitly invoke close to >>>>>>>>>>>>>>>>>>>>>>>> release the in-memory >>>>>>>>>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated >>>>>>>>>>>>>>>>>>>>>>>> quickly if it's not >>>>>>>>>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something >>>>>>>>>>>>>>>>>>>>>>>> as you suggested >>>>>>>>>>>>>>>>>>>>>>>> here, but looks to me using weak references might not >>>>>>>>>>>>>>>>>>>>>>>> help in this case. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I understand your concern, and I think you might >>>>>>>>>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working >>>>>>>>>>>>>>>>>>>>>>>> hard for best user >>>>>>>>>>>>>>>>>>>>>>>> experience, and I think the current API provides a >>>>>>>>>>>>>>>>>>>>>>>> good example of that. >>>>>>>>>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner >>>>>>>>>>>>>>>>>>>>>>>> here. I am just proposing >>>>>>>>>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed >>>>>>>>>>>>>>>>>>>>>>>> when not needed, that >>>>>>>>>>>>>>>>>>>>>>>> way the users can use this feature to iterate through >>>>>>>>>>>>>>>>>>>>>>>> large state that >>>>>>>>>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a >>>>>>>>>>>>>>>>>>>>>>>> pretty general use case >>>>>>>>>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually >>>>>>>>>>>>>>>>>>>>>>>> arguing this will be >>>>>>>>>>>>>>>>>>>>>>>> a better user experience to add this extra API since >>>>>>>>>>>>>>>>>>>>>>>> more users can benefit >>>>>>>>>>>>>>>>>>>>>>>> from it. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the >>>>>>>>>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by >>>>>>>>>>>>>>>>>>>>>>>>> using weak references and >>>>>>>>>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few >>>>>>>>>>>>>>>>>>>>>>>>> people work 10x as hard >>>>>>>>>>>>>>>>>>>>>>>>> to provide a good implementation is much better then >>>>>>>>>>>>>>>>>>>>>>>>> having 100s or 1000s >>>>>>>>>>>>>>>>>>>>>>>>> of users suffering through a more complicated API. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache >>>>>>>>>>>>>>>>>>>>>>>>>> memory footprint, but we still won't be able to >>>>>>>>>>>>>>>>>>>>>>>>>> release the underlying >>>>>>>>>>>>>>>>>>>>>>>>>> resources. We can add definitely heuristics to help >>>>>>>>>>>>>>>>>>>>>>>>>> release the resources >>>>>>>>>>>>>>>>>>>>>>>>>> as you mentioned, but there is no accurate way to >>>>>>>>>>>>>>>>>>>>>>>>>> track all the >>>>>>>>>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once >>>>>>>>>>>>>>>>>>>>>>>>>> not needed. I think while >>>>>>>>>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should >>>>>>>>>>>>>>>>>>>>>>>>>> have the option to let >>>>>>>>>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. >>>>>>>>>>>>>>>>>>>>>>>>>> Do you agree? >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources >>>>>>>>>>>>>>>>>>>>>>>>>>> and forcing them to will make the user experience >>>>>>>>>>>>>>>>>>>>>>>>>>> worse. >>>>>>>>>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file >>>>>>>>>>>>>>>>>>>>>>>>>>> format which allows random access and use a cache >>>>>>>>>>>>>>>>>>>>>>>>>>> to load/evict blocks of >>>>>>>>>>>>>>>>>>>>>>>>>>> the state from memory. >>>>>>>>>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which >>>>>>>>>>>>>>>>>>>>>>>>>>> frees the resource after a certain amount of >>>>>>>>>>>>>>>>>>>>>>>>>>> inactivity or uses weak >>>>>>>>>>>>>>>>>>>>>>>>>>> references. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, folks, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState >>>>>>>>>>>>>>>>>>>>>>>>>>>> and SetState in our Samza runner. We noticed that >>>>>>>>>>>>>>>>>>>>>>>>>>>> the state returns the >>>>>>>>>>>>>>>>>>>>>>>>>>>> Java Iterable for reading entries, keys, etc. For >>>>>>>>>>>>>>>>>>>>>>>>>>>> state backed by >>>>>>>>>>>>>>>>>>>>>>>>>>>> file-based kv store like rocksDb, we need to be >>>>>>>>>>>>>>>>>>>>>>>>>>>> able to let users >>>>>>>>>>>>>>>>>>>>>>>>>>>> explicitly close iterator/iterable to release the >>>>>>>>>>>>>>>>>>>>>>>>>>>> resources.Otherwise we >>>>>>>>>>>>>>>>>>>>>>>>>>>> have to load the iterable into memory so we can >>>>>>>>>>>>>>>>>>>>>>>>>>>> safely close the underlying >>>>>>>>>>>>>>>>>>>>>>>>>>>> rocksDb iterator, similar to Flink's >>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation. But >>>>>>>>>>>>>>>>>>>>>>>>>>>> this won't work for states that don't fit into >>>>>>>>>>>>>>>>>>>>>>>>>>>> memory. I chatted with Kenn >>>>>>>>>>>>>>>>>>>>>>>>>>>> and he also agrees we need this capability to >>>>>>>>>>>>>>>>>>>>>>>>>>>> avoid bulk read/write. This >>>>>>>>>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering >>>>>>>>>>>>>>>>>>>>>>>>>>>> if we can add the support >>>>>>>>>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Any feedback is highly >>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>> >