I feel like this discussion is kind of far from the primary intention. The point of ParDo(stateful DoFn) is to enable naive single-threaded code in a style intuitive to a beginning imperative programmer. So:
- the return value of read() should act like an immutable value - if there is a read after a write, that read should reflect the changes written, because there's a trivial happens-before relationship induced by program order - there are no non-trivial happens-before relationships - a write after a read should not affect the value read before >From that starting point, the limitations on expressiveness (single threaded per-key-and-window-and-step) give rise to embarrassingly parallel computation and GC. So when you say "portability semantics" it makes me concerned. The word "semantics" refers to the mapping between what a user writes and what it means. Primarily, that means the conceptual translation between a primitive PTransform and the corresponding PCollection-to-PCollection operation. It is true that portability complicates things because a user's code can only be given meaning relative to an SDK harness. But the end-user intention is unchanged. If we had time and resources to give the Fn API a solid spec, it should be such that the SDK harness has little choice but to implement the primitives as intended. In other words, it is the job of https://s.apache.org/beam-fn-state-api-and-bundle-processing* to implement https://s.apache.org/beam-state. The discussion of adding `ReadableState<Iterator> readIterator()` to BagState seems consistent with the latter. Kenn *IIUC you've chosen to use the same underlying proto service for side inputs and by-reference values. That's an implementation detail that I have no particular opinion about except that if it complicates implementing the primary motivator for state, it may not be a great fit. On Mon, May 14, 2018 at 7:35 PM Kenneth Knowles <k...@google.com> wrote: > I don't follow why allowing freeing resources would be counter to the > spec. I don't really know what you mean by consistent for a bundle. State, > in the sense of the user-facing per-key-and-window state API, is single > threaded and scoped to a single DoFn. There's no one else who can write the > state. If a BagState is read and written and read again, the user-facing > logic should be unaware of the resources and not have any logic to deal > with consistency. > > Kenn > > On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <lc...@google.com> wrote: > >> Hmm, some of the problem I'm dealing with is: >> * I don't understand how you would want close to change the semantics of >> a user state specification and how it affects the lifetime of user state? >> ** Does it represent committing information within a bundle? >> ** Does it mean that user state can ignore the replayable and consistent >> semantics for a lifetime of a bundle semantics? >> * I do understand that close semantics may make it easier for Samza >> runner to support state that is greater then memory, but what does it >> provide to users and how would they benefit (like what new scenarios would >> it support)? >> * I don't understand in RocksDb why you need a snapshot in the first >> place, and whether RocksDb can support seeking inside or outside a snapshot >> relatively efficiently? >> >> There seems to be some context lost initially with a discussion that you >> and Kenn had, is there a copy of that which could be shared? May help me >> get up to speed on to the problem that is being solved. >> >> >> On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <xinyuliu...@gmail.com> wrote: >> >>> I will take a look at the docs to understand the problem better. A minor >>> comment to 2) is that I don't intend to change the existing iterable API. I >>> plan to implement it similar to Flink, loading the data into memory and >>> closing the underlying snapshot after that. So the changes should be >>> backward compatible. >>> >>> Thanks, >>> Xinyu >>> >>> >>> >>> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <lc...@google.com> wrote: >>> >>>> I believe adding support for a state spec to be 'closed' or 'freed' is >>>> counter to the requirement of a state spec being consistent for the >>>> lifetime of a bundle, are we willing to change this requirement for the >>>> lifetime of a bundle or say that runners can arbitrary say that a StateSpec >>>> can't be accessed anymore? >>>> >>>> If not, I'm having trouble of thinking of a good way on how to >>>> integrate the 'close/free' API with portability semantics because: >>>> 1) Runners do control bundle size but other then the trivial cases >>>> where a runner specifically chooses: >>>> a) to only process a single element at a time (negating the need for >>>> a free/close() method). >>>> b) optimistically process elements and if you run out of memory kill >>>> the bundle and try again with a smaller bundle. >>>> 2) Adding an API to provide Iterators doesn't mean that users can't >>>> still use iterable which can't be force closed. (Dropping Iterable in >>>> exchange for Iterator in the API is backwards incompatible so likely to be >>>> deferred till the next major version of Apache Beam). >>>> 3) SDKs are able to process as many elements as they want in parallel, >>>> nothing requires them to execute elements serially throughout the pipeline >>>> graph. >>>> 4) The runner has limited information into what an SDK is doing. The >>>> SDK provides a lower bound on how many elements it has processed but SDKs >>>> aren't required to implement this meaning that they could technically be >>>> processing everything in random order after they have seen all input for a >>>> bundle. >>>> >>>> We'll need to work through the scenarios, Xinyu it would be useful for >>>> you to take a look at these two docs for context into the problem space: >>>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to process a >>>> bundle) >>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing (How to >>>> access side inputs, access remote references, and support user state) >>>> >>>> >>>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <k...@google.com> wrote: >>>> >>>>> >>>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> Before you go on and update the user facing API, we should discuss >>>>>> the last point I made since the change your making will have limited >>>>>> usability since the portability effort won't realistically allow you to >>>>>> see >>>>>> such low level things like when processElement finished and supporting >>>>>> user >>>>>> state will be modeled using the following three operations: read (with >>>>>> continuation tokens), append (blind write), clear. It may be moot to >>>>>> discuss how to integrate Samza into the existing framework and should >>>>>> work >>>>>> towards being a portability based runner. There are more details here >>>>>> about >>>>>> supporting user state during pipeline execution: >>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top >>>>>> level docs about portability are https://s.apache.org/beam-runner-api >>>>>> and https://s.apache.org/beam-fn-api. >>>>>> >>>>>> Do we really want to make a change to the user facing API for user >>>>>> state in the Java SDK when that code path will be phased out in exchange >>>>>> for using the portability APIs? >>>>>> >>>>> >>>>> To be clear, the user API will remain the same, but the runner's >>>>> implementation path will be phased out. So should we expand this >>>>> discussion >>>>> to how the portability APIs enable the SDK and runner to collaborate to >>>>> achieve this use case? It seems like the interaction you need is that the >>>>> runner can tell that the SDK can closed the connection on the read(), and >>>>> the SDK needs to do so promptly, right? >>>>> >>>>> Kenn >>>>> >>>>> >>>>> >>>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> We discussed internally about the proposed approaches. Seems if the >>>>>>> State API can also expose another method to return a >>>>>>> ReadableState<Iterator>, >>>>>>> it will cover our cases of iterating over a bigger-then-memory state, >>>>>>> and >>>>>>> closing the underlying rocksDb snapshot immediately after the iterator >>>>>>> is >>>>>>> fully consumed (most cases), and also safely close the rest of the >>>>>>> iterators after proessElement (minor cases). If this sounds good to you >>>>>>> guys, I am going to create a JIRA ticket for it and open a PR for it. >>>>>>> Thanks a lot for the discussions here. >>>>>>> >>>>>>> The discussions about Async processing is very interesting, and I >>>>>>> think it's definitely worth its own thread. I believe we do need the >>>>>>> support from the Beam API/model so the users can take advantage of it >>>>>>> (Besides Samza, Flink also has an async operator that helps a lot in the >>>>>>> Alibaba's use cases). Yes, it will add complexity to the underlying >>>>>>> framework, but it's going to be great for the users to do remote IO. In >>>>>>> practice we found it actually avoids thread+locks issues, as Kenn >>>>>>> mentioned >>>>>>> above. I am not sure whether this feature can be runner-specific support >>>>>>> thing. I will probably create another email thread for this discussion >>>>>>> in >>>>>>> the future and hopefully I can move this forward. >>>>>>> >>>>>>> Thanks, >>>>>>> Xinyu >>>>>>> >>>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <k...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <lc...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Users typically want to do that async operation and then produce >>>>>>>>> output with it. Adding asynchronous execution is difficult within the >>>>>>>>> framework because a lot of code is currently not needed to be thread >>>>>>>>> safe >>>>>>>>> and writing code to be fast and handle asynchronous execution is quite >>>>>>>>> difficult. Adding async operations typically leads to code with too >>>>>>>>> many >>>>>>>>> locks/synchronization blocks. >>>>>>>>> >>>>>>>> >>>>>>>> Just a small point here, as it is very unusual to use locks with >>>>>>>> futures. The essential innovation of futures is that it is a new way to >>>>>>>> program that avoids threads+locks style. In part, you get this for free >>>>>>>> from functional programming style, and on the other hand you reify >>>>>>>> asynchronous side effects as data dependencies. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> Note that with the portability effort, the Runner won't have >>>>>>>>> visibility into such low level things like when an object is garbage >>>>>>>>> collected and supporting user state will be modeled using the >>>>>>>>> following >>>>>>>>> three operations: read (with continuation tokens), append (blind >>>>>>>>> write), >>>>>>>>> clear. It may be moot to discuss how to integrate Samza into the >>>>>>>>> existing >>>>>>>>> framework and should work towards being a portability based runner. >>>>>>>>> There >>>>>>>>> are more details here about supporting user state during pipeline >>>>>>>>> execution: >>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top >>>>>>>>> level docs about portability are >>>>>>>>> https://s.apache.org/beam-runner-api and >>>>>>>>> https://s.apache.org/beam-fn-api. >>>>>>>>> >>>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> At least one API that has been discussed in the past, is to use >>>>>>>>>> Java 8 CompletionStage. e.g. >>>>>>>>>> >>>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>>> @ProcessElement >>>>>>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>>>>>> ...) { >>>>>>>>>> element.thenApply(...) >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> The framework will automatically create the CompletionStage, and >>>>>>>>>> the process method can specify a pipeline of asynchronous operations >>>>>>>>>> to >>>>>>>>>> perform on the element. When all of them are done, the element will >>>>>>>>>> be >>>>>>>>>> marked as successfully processed. >>>>>>>>>> >>>>>>>>>> Reuven >>>>>>>>>> >>>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for all the pointers. I looked though the discussion over >>>>>>>>>>> BEAM-2975 >>>>>>>>>>> and BEAM-2980 about having snapshot or live views of iterable, >>>>>>>>>>> and the current semantics makes a lot of sense to me. For your >>>>>>>>>>> question: >>>>>>>>>>> it does not require an explicit snapshot when we create RocksDb >>>>>>>>>>> iterator >>>>>>>>>>> directly. The iterator will read from an implicit snapshot as of >>>>>>>>>>> the time >>>>>>>>>>> the iterator is created [1], and the snapshot will be released >>>>>>>>>>> after the >>>>>>>>>>> iterator is closed. If we can have another method to return >>>>>>>>>>> ReadableState<Iterator>, we might be able to apply the auto-closing >>>>>>>>>>> approaches as we discussed and solve the problem here :). >>>>>>>>>>> >>>>>>>>>>> It's very interesting that you bring up the discussion about >>>>>>>>>>> async API! Async IO has been widely adopted here among our users: >>>>>>>>>>> they use >>>>>>>>>>> netty for async calls with library named ParSeq [2] to help manage >>>>>>>>>>> the >>>>>>>>>>> calls. Samza provides a primitive callback style API [3], in which >>>>>>>>>>> the user >>>>>>>>>>> will invoke the callback after the remote calls are complete. >>>>>>>>>>> Currently in >>>>>>>>>>> a Samza job our users use this API with the ParSeq lib for remote >>>>>>>>>>> IO. Seems >>>>>>>>>>> we might have to do blocking calls (thus the poor resource >>>>>>>>>>> utilization you >>>>>>>>>>> mentioned) when using Beam API for now. It'll be great if you can >>>>>>>>>>> send a >>>>>>>>>>> few more details about the discussion about async API. I would like >>>>>>>>>>> to add >>>>>>>>>>> our use case and help move this forward. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Xinyu >>>>>>>>>>> >>>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator >>>>>>>>>>> [2]: https://github.com/linkedin/parseq >>>>>>>>>>> [3]: >>>>>>>>>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <k...@google.com >>>>>>>>>>> > wrote: >>>>>>>>>>> >>>>>>>>>>>> I don't have any further suggestions, but want to call out how >>>>>>>>>>>> this hits a lot of interesting points. >>>>>>>>>>>> >>>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975 [1] >>>>>>>>>>>> and BEAM-2980 [2] where we debated things a bit. I think the >>>>>>>>>>>> strongest case >>>>>>>>>>>> is for what you describe - it should be a snapshot. Perhaps they >>>>>>>>>>>> should >>>>>>>>>>>> both be closed as fixed... >>>>>>>>>>>> >>>>>>>>>>>> And you also bring up long blocking calls - we have also >>>>>>>>>>>> deliberately decided that long synchronous blocking calls in >>>>>>>>>>>> @ProcessElement can be embraced for simple programming and >>>>>>>>>>>> compensated with >>>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing poor >>>>>>>>>>>> utilization). The alternative is a more future-istic API where the >>>>>>>>>>>> calls >>>>>>>>>>>> can be explicitly asynchronous. We've had some interesting dev@ >>>>>>>>>>>> list discussions about that, too. >>>>>>>>>>>> >>>>>>>>>>>> Is another possibility to perhaps have read() return a >>>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have two >>>>>>>>>>>> methods with >>>>>>>>>>>> different names, one for iterator one for snapshot iterable. But >>>>>>>>>>>> wouldn't >>>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native RocksDb >>>>>>>>>>>> iterator >>>>>>>>>>>> require a snapshot to have well-defined contents? As you can tell, >>>>>>>>>>>> I don't >>>>>>>>>>>> know enough about RocksDb details to be sure of my suggestions. >>>>>>>>>>>> >>>>>>>>>>>> Kenn >>>>>>>>>>>> >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980 >>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975 >>>>>>>>>>>> >>>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu < >>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted >>>>>>>>>>>>> the question here, we discussed internally about releasing the >>>>>>>>>>>>> underlying >>>>>>>>>>>>> resources after consuming the whole iterator. This probably >>>>>>>>>>>>> covers quite a >>>>>>>>>>>>> lot of use cases. For some special cases that the user only >>>>>>>>>>>>> consume part of >>>>>>>>>>>>> the iterator, Luke and Kenn's suggestion about releasing after >>>>>>>>>>>>> processElement() might work (I need to confirm about this with >>>>>>>>>>>>> our use >>>>>>>>>>>>> cases). So based on what we discussed so far, we might have a >>>>>>>>>>>>> good way to >>>>>>>>>>>>> automatically close an iterator for the store. >>>>>>>>>>>>> >>>>>>>>>>>>> There is another issue though: right now the state API returns >>>>>>>>>>>>> an iterable for entries(), keys() and values(), and we can create >>>>>>>>>>>>> iterator >>>>>>>>>>>>> from it. From my understanding, the iterable holds a snapshot of >>>>>>>>>>>>> the >>>>>>>>>>>>> underlying store. In case of rocksDb, it's going to be a >>>>>>>>>>>>> db.snapshot(). >>>>>>>>>>>>> Then when can we release the snapshot? It's not like iterator >>>>>>>>>>>>> where we can >>>>>>>>>>>>> use some heuristics to automatically release it. The user can >>>>>>>>>>>>> hold on to >>>>>>>>>>>>> the iterable and create iterators throughout the whole >>>>>>>>>>>>> processElement(). >>>>>>>>>>>>> But if we only close the iterable after processElement(), I am >>>>>>>>>>>>> quite >>>>>>>>>>>>> concerned about the limitations this will bring. If the user is >>>>>>>>>>>>> doing some >>>>>>>>>>>>> remote call during the process, then the snapshot might be held >>>>>>>>>>>>> for a long >>>>>>>>>>>>> time before releasing, and might cause performance problems. And >>>>>>>>>>>>> if the >>>>>>>>>>>>> user happen to create multiple iterables, then there will be >>>>>>>>>>>>> multiple >>>>>>>>>>>>> snapshots loaded during process. Luke suggested being aggressive >>>>>>>>>>>>> at closing >>>>>>>>>>>>> the resources and recreating when needed again. But in this case >>>>>>>>>>>>> it might >>>>>>>>>>>>> not work since we won't be able to recreate the same snapshot >>>>>>>>>>>>> given the >>>>>>>>>>>>> store might have been updated (and creating rocksDb snapshot is >>>>>>>>>>>>> not cheap >>>>>>>>>>>>> too). I am running out of ideas other than exposing the iterator >>>>>>>>>>>>> itself >>>>>>>>>>>>> somehow (and add close() if needed?). Any further suggestions? >>>>>>>>>>>>> >>>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier >>>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it in >>>>>>>>>>>>> some sort of >>>>>>>>>>>>> StateResource in the fear that people might reject the proposal >>>>>>>>>>>>> immediately >>>>>>>>>>>>> after seeing the close() on the iterator. I guess our users are >>>>>>>>>>>>> familiar >>>>>>>>>>>>> with rocksDb state, it's pretty normal to close the >>>>>>>>>>>>> iterator/snapshot after >>>>>>>>>>>>> using it. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Xinyu >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik < >>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> The iterator going out of scope is the idiomatic way that >>>>>>>>>>>>>> resources are freed for Java developers (hence the weak/phantom >>>>>>>>>>>>>> reference >>>>>>>>>>>>>> suggestion). >>>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like file >>>>>>>>>>>>>> streams) lead to leaked resources. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles < >>>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks Xinyu, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I actually had first sketched out just what you wrote. But >>>>>>>>>>>>>>> then I realized a few things: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - usually an Iterable does not allocate resources, only its >>>>>>>>>>>>>>> Iterators >>>>>>>>>>>>>>> - if you consume the whole iterator, I hope the user would >>>>>>>>>>>>>>> not have to do any extra work >>>>>>>>>>>>>>> - you can also automatically free it up at the end of the >>>>>>>>>>>>>>> call to @ProcessElement so that is easy too (but you might not >>>>>>>>>>>>>>> want to) >>>>>>>>>>>>>>> - so the main use is when the iterator terminates early and >>>>>>>>>>>>>>> is not fully consumed and you can't wait to finish the method >>>>>>>>>>>>>>> - the scoping in Java will force a bunch of uninitialized >>>>>>>>>>>>>>> declarations outside the try-with-resources block, kind of a >>>>>>>>>>>>>>> lot of >>>>>>>>>>>>>>> boilerplate LOC >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> One thing that is good about your proposal is that the >>>>>>>>>>>>>>> iterable could have some transparent caches that all free up >>>>>>>>>>>>>>> together. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu < >>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for drafting the details about the two approaches, >>>>>>>>>>>>>>>> Kenn. Now I understand Luke's proposal better. The approach >>>>>>>>>>>>>>>> looks neat, but >>>>>>>>>>>>>>>> the uncertainty of *when* GC is going to kick in will make >>>>>>>>>>>>>>>> users' life hard. If the user happens to configure a large JVM >>>>>>>>>>>>>>>> heap size, >>>>>>>>>>>>>>>> and since rocksDb uses off-heap memory, GC might start very >>>>>>>>>>>>>>>> late and less >>>>>>>>>>>>>>>> frequent than what we want. If we don't have a *definitive* >>>>>>>>>>>>>>>> way to let user close the underlying resources, then there is >>>>>>>>>>>>>>>> no good way >>>>>>>>>>>>>>>> to handle such failures of a critical application in >>>>>>>>>>>>>>>> production. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Having a close method in the iterator might be a little >>>>>>>>>>>>>>>> unorthodox :). To some degree, this is actually a resource we >>>>>>>>>>>>>>>> are holding >>>>>>>>>>>>>>>> underneath, and I think it's pretty common to have close() for >>>>>>>>>>>>>>>> a resource >>>>>>>>>>>>>>>> in Java, e.g. BufferedReader and BufferedWriter. So I would >>>>>>>>>>>>>>>> imagine that we >>>>>>>>>>>>>>>> also define a resource for the state iterator and make the >>>>>>>>>>>>>>>> interface >>>>>>>>>>>>>>>> implements *AutoCloseable*. Here is my sketch: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // StateResource MUST be closed after use. >>>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st = >>>>>>>>>>>>>>>> bagState.iteratorResource()) { >>>>>>>>>>>>>>>> Iterator<SomeType> iter = st.iterator(); >>>>>>>>>>>>>>>> while (iter.hasNext() { >>>>>>>>>>>>>>>> .. do stuff ... >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } catch (Exception e) { >>>>>>>>>>>>>>>> ... user code >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The type/method name are just for illustrating here, so >>>>>>>>>>>>>>>> please don't laugh at them. Please feel free to comment and >>>>>>>>>>>>>>>> let me know if >>>>>>>>>>>>>>>> you have thoughts about the programming patterns here. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles < >>>>>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or not. >>>>>>>>>>>>>>>>> There has been no specific API proposed. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think the problem statement is real - you need to be >>>>>>>>>>>>>>>>> able to read and write bigger-than-memory state. It seems we >>>>>>>>>>>>>>>>> have multiple >>>>>>>>>>>>>>>>> runners that don't support it, perhaps because of our API. >>>>>>>>>>>>>>>>> You might be >>>>>>>>>>>>>>>>> able to build something good enough with phantom references, >>>>>>>>>>>>>>>>> but you might >>>>>>>>>>>>>>>>> not. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If I understand the idea, it might look something like >>>>>>>>>>>>>>>>> this: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>>>>>> @StateId("foo") >>>>>>>>>>>>>>>>> private final StateSpec<BagState<Whatever>> >>>>>>>>>>>>>>>>> myBagSpec = ... >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>>> public void proc(@StateId("foo") BagState<Whatever> >>>>>>>>>>>>>>>>> myBag, ...) { >>>>>>>>>>>>>>>>> CloseableIterator<Whatever> iterator = >>>>>>>>>>>>>>>>> myBag.get().iterator(); >>>>>>>>>>>>>>>>> while(iterator.hasNext() && ... special condition >>>>>>>>>>>>>>>>> ...) { >>>>>>>>>>>>>>>>> ... do stuff ... >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> iterator.close(); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> So it has no impact on users who don't choose to close() >>>>>>>>>>>>>>>>> since they iterate with for ( : ) as usual. And a runner that >>>>>>>>>>>>>>>>> has the 10x >>>>>>>>>>>>>>>>> funding to try out a ReferenceQueue can be resilient to users >>>>>>>>>>>>>>>>> that forget. >>>>>>>>>>>>>>>>> On the other hand, I haven't seen this pattern much in the >>>>>>>>>>>>>>>>> wild, so I think >>>>>>>>>>>>>>>>> it is valuable to discuss other methods. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> While Luke's proposal is something like this if I >>>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with >>>>>>>>>>>>>>>>> PhantomReference seems >>>>>>>>>>>>>>>>> to be what you really want): >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ... in RocksDb state implementation ... >>>>>>>>>>>>>>>>> class RocksDbBagState { >>>>>>>>>>>>>>>>> static ReferenceQueue rocksDbIteratorQueue = new >>>>>>>>>>>>>>>>> ReferenceQueue(); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> class Iterator { >>>>>>>>>>>>>>>>> PhantomReference<RocksDbJniIterator> cIter; >>>>>>>>>>>>>>>>> .next() { >>>>>>>>>>>>>>>>> return cIter.next(); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> class Iterable { >>>>>>>>>>>>>>>>> .iterator() { >>>>>>>>>>>>>>>>> return new Iterator(new >>>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, rocksDbIteratorQueue)); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ... on another thread ... >>>>>>>>>>>>>>>>> while(true) { >>>>>>>>>>>>>>>>> RocksDbIterator deadRef = (RocksDbIterator) >>>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove(); >>>>>>>>>>>>>>>>> deadRef.close(); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference will pop >>>>>>>>>>>>>>>>> onto the queue for being closed. This might not be too bad. >>>>>>>>>>>>>>>>> You'll have >>>>>>>>>>>>>>>>> delayed resource release, and potentially masked errors that >>>>>>>>>>>>>>>>> are hard to >>>>>>>>>>>>>>>>> debug. It is less error-prone than WeakReference, which is >>>>>>>>>>>>>>>>> asking for >>>>>>>>>>>>>>>>> trouble when objects are collected en masse. Anecdotally I >>>>>>>>>>>>>>>>> have heard that >>>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I haven't >>>>>>>>>>>>>>>>> experienced it >>>>>>>>>>>>>>>>> myself and I can't find good data. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu < >>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If I understand correctly, using weak references will >>>>>>>>>>>>>>>>>> help clean up the Java objects once GC kicks in. In case of >>>>>>>>>>>>>>>>>> kv-store likes >>>>>>>>>>>>>>>>>> rocksDb, the Java iterator is just a JNI interface to the >>>>>>>>>>>>>>>>>> underlying C >>>>>>>>>>>>>>>>>> iterator, so we need to explicitly invoke close to release >>>>>>>>>>>>>>>>>> the in-memory >>>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated quickly if >>>>>>>>>>>>>>>>>> it's not >>>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something as >>>>>>>>>>>>>>>>>> you suggested >>>>>>>>>>>>>>>>>> here, but looks to me using weak references might not help >>>>>>>>>>>>>>>>>> in this case. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I understand your concern, and I think you might >>>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working hard >>>>>>>>>>>>>>>>>> for best user >>>>>>>>>>>>>>>>>> experience, and I think the current API provides a good >>>>>>>>>>>>>>>>>> example of that. >>>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner here. I am >>>>>>>>>>>>>>>>>> just proposing >>>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed when >>>>>>>>>>>>>>>>>> not needed, that >>>>>>>>>>>>>>>>>> way the users can use this feature to iterate through large >>>>>>>>>>>>>>>>>> state that >>>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a pretty >>>>>>>>>>>>>>>>>> general use case >>>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually >>>>>>>>>>>>>>>>>> arguing this will be >>>>>>>>>>>>>>>>>> a better user experience to add this extra API since more >>>>>>>>>>>>>>>>>> users can benefit >>>>>>>>>>>>>>>>>> from it. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the >>>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by using >>>>>>>>>>>>>>>>>>> weak references and >>>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few people >>>>>>>>>>>>>>>>>>> work 10x as hard >>>>>>>>>>>>>>>>>>> to provide a good implementation is much better then having >>>>>>>>>>>>>>>>>>> 100s or 1000s >>>>>>>>>>>>>>>>>>> of users suffering through a more complicated API. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory >>>>>>>>>>>>>>>>>>>> footprint, but we still won't be able to release the >>>>>>>>>>>>>>>>>>>> underlying resources. >>>>>>>>>>>>>>>>>>>> We can add definitely heuristics to help release the >>>>>>>>>>>>>>>>>>>> resources as you >>>>>>>>>>>>>>>>>>>> mentioned, but there is no accurate way to track all the >>>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once not >>>>>>>>>>>>>>>>>>>> needed. I think while >>>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should have >>>>>>>>>>>>>>>>>>>> the option to let >>>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. Do you >>>>>>>>>>>>>>>>>>>> agree? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources and >>>>>>>>>>>>>>>>>>>>> forcing them to will make the user experience worse. >>>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file format >>>>>>>>>>>>>>>>>>>>> which allows random access and use a cache to load/evict >>>>>>>>>>>>>>>>>>>>> blocks of the >>>>>>>>>>>>>>>>>>>>> state from memory. >>>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which frees >>>>>>>>>>>>>>>>>>>>> the resource after a certain amount of inactivity or uses >>>>>>>>>>>>>>>>>>>>> weak references. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi, folks, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and >>>>>>>>>>>>>>>>>>>>>> SetState in our Samza runner. We noticed that the state >>>>>>>>>>>>>>>>>>>>>> returns the Java >>>>>>>>>>>>>>>>>>>>>> Iterable for reading entries, keys, etc. For state >>>>>>>>>>>>>>>>>>>>>> backed by file-based kv >>>>>>>>>>>>>>>>>>>>>> store like rocksDb, we need to be able to let users >>>>>>>>>>>>>>>>>>>>>> explicitly close >>>>>>>>>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise we >>>>>>>>>>>>>>>>>>>>>> have to load the >>>>>>>>>>>>>>>>>>>>>> iterable into memory so we can safely close the >>>>>>>>>>>>>>>>>>>>>> underlying rocksDb >>>>>>>>>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But >>>>>>>>>>>>>>>>>>>>>> this won't work for states that don't fit into memory. I >>>>>>>>>>>>>>>>>>>>>> chatted with Kenn >>>>>>>>>>>>>>>>>>>>>> and he also agrees we need this capability to avoid bulk >>>>>>>>>>>>>>>>>>>>>> read/write. This >>>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we >>>>>>>>>>>>>>>>>>>>>> can add the support >>>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any >>>>>>>>>>>>>>>>>>>>>> feedback is highly >>>>>>>>>>>>>>>>>>>>>> appreciated. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>> >>>