OK, got it. But what consistency are you referring to? I was trying to point out that there's nothing but straight-line program order consistency. There's only one actor doing all the reads and all the writes.
Kenn On Tue, May 15, 2018 at 8:39 AM Lukasz Cwik <lc...@google.com> wrote: > I misspoke when I said portability semantics and should have said > portability design/implementation. This is why I had a follow-up e-mail and > clarified that I'm confused on: > * I don't understand how you would want close to change the semantics of a > user state specification and how it affects the lifetime of user state? > ** Does it represent committing information within a bundle? > ** Does it mean that user state can ignore the replayable and consistent > semantics for a lifetime of a bundle semantics? > > I'm trying to tie back what does a `ReadableState<Iterator> > readIterator()` means for Runner authors and how it solves the > memory/close() problem for Samza. Based upon > https://s.apache.org/beam-state, reading and writing of state must be > consistent. Does this mean that Samza must use snapshots for the lifetime > of a bundle? If so, I don't see how adding a `ReadableState<Iterator> > readIterator()` allows Samza to ignore the consistency requirement and be > allowed to free snapshots. > > It might be worthwhile to setup a three way hangouts call to help me as I > don't have the same level of context which can be shared back to this > thread. Xinyu / Kenn, how about we setup a time using Slack? > > On Mon, May 14, 2018 at 8:36 PM Kenneth Knowles <k...@google.com> wrote: > >> I feel like this discussion is kind of far from the primary intention. >> The point of ParDo(stateful DoFn) is to enable naive single-threaded code >> in a style intuitive to a beginning imperative programmer. So: >> >> - the return value of read() should act like an immutable value >> - if there is a read after a write, that read should reflect the changes >> written, because there's a trivial happens-before relationship induced by >> program order >> - there are no non-trivial happens-before relationships >> - a write after a read should not affect the value read before >> >> From that starting point, the limitations on expressiveness (single >> threaded per-key-and-window-and-step) give rise to embarrassingly parallel >> computation and GC. >> >> So when you say "portability semantics" it makes me concerned. The word >> "semantics" refers to the mapping between what a user writes and what it >> means. Primarily, that means the conceptual translation between a primitive >> PTransform and the corresponding PCollection-to-PCollection operation. It >> is true that portability complicates things because a user's code can only >> be given meaning relative to an SDK harness. But the end-user intention is >> unchanged. If we had time and resources to give the Fn API a solid spec, it >> should be such that the SDK harness has little choice but to implement the >> primitives as intended. >> >> In other words, it is the job of >> https://s.apache.org/beam-fn-state-api-and-bundle-processing* to >> implement https://s.apache.org/beam-state. The discussion of adding >> `ReadableState<Iterator> readIterator()` to BagState seems consistent with >> the latter. >> >> Kenn >> >> *IIUC you've chosen to use the same underlying proto service for side >> inputs and by-reference values. That's an implementation detail that I have >> no particular opinion about except that if it complicates implementing the >> primary motivator for state, it may not be a great fit. >> >> >> >> On Mon, May 14, 2018 at 7:35 PM Kenneth Knowles <k...@google.com> wrote: >> >>> I don't follow why allowing freeing resources would be counter to the >>> spec. I don't really know what you mean by consistent for a bundle. State, >>> in the sense of the user-facing per-key-and-window state API, is single >>> threaded and scoped to a single DoFn. There's no one else who can write the >>> state. If a BagState is read and written and read again, the user-facing >>> logic should be unaware of the resources and not have any logic to deal >>> with consistency. >>> >>> Kenn >>> >>> On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> Hmm, some of the problem I'm dealing with is: >>>> * I don't understand how you would want close to change the semantics >>>> of a user state specification and how it affects the lifetime of user >>>> state? >>>> ** Does it represent committing information within a bundle? >>>> ** Does it mean that user state can ignore the replayable and >>>> consistent semantics for a lifetime of a bundle semantics? >>>> * I do understand that close semantics may make it easier for Samza >>>> runner to support state that is greater then memory, but what does it >>>> provide to users and how would they benefit (like what new scenarios would >>>> it support)? >>>> * I don't understand in RocksDb why you need a snapshot in the first >>>> place, and whether RocksDb can support seeking inside or outside a snapshot >>>> relatively efficiently? >>>> >>>> There seems to be some context lost initially with a discussion that >>>> you and Kenn had, is there a copy of that which could be shared? May help >>>> me get up to speed on to the problem that is being solved. >>>> >>>> >>>> On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <xinyuliu...@gmail.com> >>>> wrote: >>>> >>>>> I will take a look at the docs to understand the problem better. A >>>>> minor comment to 2) is that I don't intend to change the existing iterable >>>>> API. I plan to implement it similar to Flink, loading the data into memory >>>>> and closing the underlying snapshot after that. So the changes should be >>>>> backward compatible. >>>>> >>>>> Thanks, >>>>> Xinyu >>>>> >>>>> >>>>> >>>>> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> I believe adding support for a state spec to be 'closed' or 'freed' >>>>>> is counter to the requirement of a state spec being consistent for the >>>>>> lifetime of a bundle, are we willing to change this requirement for the >>>>>> lifetime of a bundle or say that runners can arbitrary say that a >>>>>> StateSpec >>>>>> can't be accessed anymore? >>>>>> >>>>>> If not, I'm having trouble of thinking of a good way on how to >>>>>> integrate the 'close/free' API with portability semantics because: >>>>>> 1) Runners do control bundle size but other then the trivial cases >>>>>> where a runner specifically chooses: >>>>>> a) to only process a single element at a time (negating the need >>>>>> for a free/close() method). >>>>>> b) optimistically process elements and if you run out of memory >>>>>> kill the bundle and try again with a smaller bundle. >>>>>> 2) Adding an API to provide Iterators doesn't mean that users can't >>>>>> still use iterable which can't be force closed. (Dropping Iterable in >>>>>> exchange for Iterator in the API is backwards incompatible so likely to >>>>>> be >>>>>> deferred till the next major version of Apache Beam). >>>>>> 3) SDKs are able to process as many elements as they want in >>>>>> parallel, nothing requires them to execute elements serially throughout >>>>>> the >>>>>> pipeline graph. >>>>>> 4) The runner has limited information into what an SDK is doing. The >>>>>> SDK provides a lower bound on how many elements it has processed but SDKs >>>>>> aren't required to implement this meaning that they could technically be >>>>>> processing everything in random order after they have seen all input for >>>>>> a >>>>>> bundle. >>>>>> >>>>>> We'll need to work through the scenarios, Xinyu it would be useful >>>>>> for you to take a look at these two docs for context into the problem >>>>>> space: >>>>>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to process >>>>>> a bundle) >>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing (How to >>>>>> access side inputs, access remote references, and support user state) >>>>>> >>>>>> >>>>>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <k...@google.com> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <lc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Before you go on and update the user facing API, we should discuss >>>>>>>> the last point I made since the change your making will have limited >>>>>>>> usability since the portability effort won't realistically allow you >>>>>>>> to see >>>>>>>> such low level things like when processElement finished and supporting >>>>>>>> user >>>>>>>> state will be modeled using the following three operations: read (with >>>>>>>> continuation tokens), append (blind write), clear. It may be moot to >>>>>>>> discuss how to integrate Samza into the existing framework and should >>>>>>>> work >>>>>>>> towards being a portability based runner. There are more details here >>>>>>>> about >>>>>>>> supporting user state during pipeline execution: >>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top >>>>>>>> level docs about portability are >>>>>>>> https://s.apache.org/beam-runner-api and >>>>>>>> https://s.apache.org/beam-fn-api. >>>>>>>> >>>>>>>> Do we really want to make a change to the user facing API for user >>>>>>>> state in the Java SDK when that code path will be phased out in >>>>>>>> exchange >>>>>>>> for using the portability APIs? >>>>>>>> >>>>>>> >>>>>>> To be clear, the user API will remain the same, but the runner's >>>>>>> implementation path will be phased out. So should we expand this >>>>>>> discussion >>>>>>> to how the portability APIs enable the SDK and runner to collaborate to >>>>>>> achieve this use case? It seems like the interaction you need is that >>>>>>> the >>>>>>> runner can tell that the SDK can closed the connection on the read(), >>>>>>> and >>>>>>> the SDK needs to do so promptly, right? >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> >>>>>>> >>>>>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> We discussed internally about the proposed approaches. Seems if >>>>>>>>> the State API can also expose another method to return a >>>>>>>>> ReadableState<Iterator>, >>>>>>>>> it will cover our cases of iterating over a bigger-then-memory state, >>>>>>>>> and >>>>>>>>> closing the underlying rocksDb snapshot immediately after the >>>>>>>>> iterator is >>>>>>>>> fully consumed (most cases), and also safely close the rest of the >>>>>>>>> iterators after proessElement (minor cases). If this sounds good to >>>>>>>>> you >>>>>>>>> guys, I am going to create a JIRA ticket for it and open a PR for it. >>>>>>>>> Thanks a lot for the discussions here. >>>>>>>>> >>>>>>>>> The discussions about Async processing is very interesting, and I >>>>>>>>> think it's definitely worth its own thread. I believe we do need the >>>>>>>>> support from the Beam API/model so the users can take advantage of it >>>>>>>>> (Besides Samza, Flink also has an async operator that helps a lot in >>>>>>>>> the >>>>>>>>> Alibaba's use cases). Yes, it will add complexity to the underlying >>>>>>>>> framework, but it's going to be great for the users to do remote IO. >>>>>>>>> In >>>>>>>>> practice we found it actually avoids thread+locks issues, as Kenn >>>>>>>>> mentioned >>>>>>>>> above. I am not sure whether this feature can be runner-specific >>>>>>>>> support >>>>>>>>> thing. I will probably create another email thread for this >>>>>>>>> discussion in >>>>>>>>> the future and hopefully I can move this forward. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Xinyu >>>>>>>>> >>>>>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <k...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <lc...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Users typically want to do that async operation and then produce >>>>>>>>>>> output with it. Adding asynchronous execution is difficult within >>>>>>>>>>> the >>>>>>>>>>> framework because a lot of code is currently not needed to be >>>>>>>>>>> thread safe >>>>>>>>>>> and writing code to be fast and handle asynchronous execution is >>>>>>>>>>> quite >>>>>>>>>>> difficult. Adding async operations typically leads to code with too >>>>>>>>>>> many >>>>>>>>>>> locks/synchronization blocks. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Just a small point here, as it is very unusual to use locks with >>>>>>>>>> futures. The essential innovation of futures is that it is a new way >>>>>>>>>> to >>>>>>>>>> program that avoids threads+locks style. In part, you get this for >>>>>>>>>> free >>>>>>>>>> from functional programming style, and on the other hand you reify >>>>>>>>>> asynchronous side effects as data dependencies. >>>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Note that with the portability effort, the Runner won't have >>>>>>>>>>> visibility into such low level things like when an object is garbage >>>>>>>>>>> collected and supporting user state will be modeled using the >>>>>>>>>>> following >>>>>>>>>>> three operations: read (with continuation tokens), append (blind >>>>>>>>>>> write), >>>>>>>>>>> clear. It may be moot to discuss how to integrate Samza into the >>>>>>>>>>> existing >>>>>>>>>>> framework and should work towards being a portability based runner. >>>>>>>>>>> There >>>>>>>>>>> are more details here about supporting user state during pipeline >>>>>>>>>>> execution: >>>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, >>>>>>>>>>> top level docs about portability are >>>>>>>>>>> https://s.apache.org/beam-runner-api and >>>>>>>>>>> https://s.apache.org/beam-fn-api. >>>>>>>>>>> >>>>>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <re...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> At least one API that has been discussed in the past, is to use >>>>>>>>>>>> Java 8 CompletionStage. e.g. >>>>>>>>>>>> >>>>>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>>>>> @ProcessElement >>>>>>>>>>>> public void process(@Element CompletionStage<InputT> >>>>>>>>>>>> element, ...) { >>>>>>>>>>>> element.thenApply(...) >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> The framework will automatically create the CompletionStage, >>>>>>>>>>>> and the process method can specify a pipeline of asynchronous >>>>>>>>>>>> operations to >>>>>>>>>>>> perform on the element. When all of them are done, the element >>>>>>>>>>>> will be >>>>>>>>>>>> marked as successfully processed. >>>>>>>>>>>> >>>>>>>>>>>> Reuven >>>>>>>>>>>> >>>>>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu < >>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks for all the pointers. I looked though the discussion >>>>>>>>>>>>> over BEAM-2975 and BEAM-2980 about having snapshot or live >>>>>>>>>>>>> views of iterable, and the current semantics makes a lot of sense >>>>>>>>>>>>> to me. >>>>>>>>>>>>> For your question: it does not require an explicit snapshot when >>>>>>>>>>>>> we create >>>>>>>>>>>>> RocksDb iterator directly. The iterator will read from an >>>>>>>>>>>>> implicit snapshot >>>>>>>>>>>>> as of the time the iterator is created [1], and the snapshot will >>>>>>>>>>>>> be >>>>>>>>>>>>> released after the iterator is closed. If we can have another >>>>>>>>>>>>> method to >>>>>>>>>>>>> return ReadableState<Iterator>, we might be able to apply the >>>>>>>>>>>>> auto-closing >>>>>>>>>>>>> approaches as we discussed and solve the problem here :). >>>>>>>>>>>>> >>>>>>>>>>>>> It's very interesting that you bring up the discussion about >>>>>>>>>>>>> async API! Async IO has been widely adopted here among our users: >>>>>>>>>>>>> they use >>>>>>>>>>>>> netty for async calls with library named ParSeq [2] to help >>>>>>>>>>>>> manage the >>>>>>>>>>>>> calls. Samza provides a primitive callback style API [3], in >>>>>>>>>>>>> which the user >>>>>>>>>>>>> will invoke the callback after the remote calls are complete. >>>>>>>>>>>>> Currently in >>>>>>>>>>>>> a Samza job our users use this API with the ParSeq lib for remote >>>>>>>>>>>>> IO. Seems >>>>>>>>>>>>> we might have to do blocking calls (thus the poor resource >>>>>>>>>>>>> utilization you >>>>>>>>>>>>> mentioned) when using Beam API for now. It'll be great if you can >>>>>>>>>>>>> send a >>>>>>>>>>>>> few more details about the discussion about async API. I would >>>>>>>>>>>>> like to add >>>>>>>>>>>>> our use case and help move this forward. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Xinyu >>>>>>>>>>>>> >>>>>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator >>>>>>>>>>>>> [2]: https://github.com/linkedin/parseq >>>>>>>>>>>>> [3]: >>>>>>>>>>>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles < >>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I don't have any further suggestions, but want to call out >>>>>>>>>>>>>> how this hits a lot of interesting points. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975 [1] >>>>>>>>>>>>>> and BEAM-2980 [2] where we debated things a bit. I think the >>>>>>>>>>>>>> strongest case >>>>>>>>>>>>>> is for what you describe - it should be a snapshot. Perhaps they >>>>>>>>>>>>>> should >>>>>>>>>>>>>> both be closed as fixed... >>>>>>>>>>>>>> >>>>>>>>>>>>>> And you also bring up long blocking calls - we have also >>>>>>>>>>>>>> deliberately decided that long synchronous blocking calls in >>>>>>>>>>>>>> @ProcessElement can be embraced for simple programming and >>>>>>>>>>>>>> compensated with >>>>>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing poor >>>>>>>>>>>>>> utilization). The alternative is a more future-istic API where >>>>>>>>>>>>>> the calls >>>>>>>>>>>>>> can be explicitly asynchronous. We've had some interesting dev@ >>>>>>>>>>>>>> list discussions about that, too. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is another possibility to perhaps have read() return a >>>>>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have two >>>>>>>>>>>>>> methods with >>>>>>>>>>>>>> different names, one for iterator one for snapshot iterable. But >>>>>>>>>>>>>> wouldn't >>>>>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native RocksDb >>>>>>>>>>>>>> iterator >>>>>>>>>>>>>> require a snapshot to have well-defined contents? As you can >>>>>>>>>>>>>> tell, I don't >>>>>>>>>>>>>> know enough about RocksDb details to be sure of my suggestions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980 >>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975 >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu < >>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted >>>>>>>>>>>>>>> the question here, we discussed internally about releasing the >>>>>>>>>>>>>>> underlying >>>>>>>>>>>>>>> resources after consuming the whole iterator. This probably >>>>>>>>>>>>>>> covers quite a >>>>>>>>>>>>>>> lot of use cases. For some special cases that the user only >>>>>>>>>>>>>>> consume part of >>>>>>>>>>>>>>> the iterator, Luke and Kenn's suggestion about releasing after >>>>>>>>>>>>>>> processElement() might work (I need to confirm about this with >>>>>>>>>>>>>>> our use >>>>>>>>>>>>>>> cases). So based on what we discussed so far, we might have a >>>>>>>>>>>>>>> good way to >>>>>>>>>>>>>>> automatically close an iterator for the store. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> There is another issue though: right now the state API >>>>>>>>>>>>>>> returns an iterable for entries(), keys() and values(), and we >>>>>>>>>>>>>>> can create >>>>>>>>>>>>>>> iterator from it. From my understanding, the iterable holds a >>>>>>>>>>>>>>> snapshot of >>>>>>>>>>>>>>> the underlying store. In case of rocksDb, it's going to be a >>>>>>>>>>>>>>> db.snapshot(). >>>>>>>>>>>>>>> Then when can we release the snapshot? It's not like iterator >>>>>>>>>>>>>>> where we can >>>>>>>>>>>>>>> use some heuristics to automatically release it. The user can >>>>>>>>>>>>>>> hold on to >>>>>>>>>>>>>>> the iterable and create iterators throughout the whole >>>>>>>>>>>>>>> processElement(). >>>>>>>>>>>>>>> But if we only close the iterable after processElement(), I am >>>>>>>>>>>>>>> quite >>>>>>>>>>>>>>> concerned about the limitations this will bring. If the user is >>>>>>>>>>>>>>> doing some >>>>>>>>>>>>>>> remote call during the process, then the snapshot might be held >>>>>>>>>>>>>>> for a long >>>>>>>>>>>>>>> time before releasing, and might cause performance problems. >>>>>>>>>>>>>>> And if the >>>>>>>>>>>>>>> user happen to create multiple iterables, then there will be >>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>> snapshots loaded during process. Luke suggested being >>>>>>>>>>>>>>> aggressive at closing >>>>>>>>>>>>>>> the resources and recreating when needed again. But in this >>>>>>>>>>>>>>> case it might >>>>>>>>>>>>>>> not work since we won't be able to recreate the same snapshot >>>>>>>>>>>>>>> given the >>>>>>>>>>>>>>> store might have been updated (and creating rocksDb snapshot is >>>>>>>>>>>>>>> not cheap >>>>>>>>>>>>>>> too). I am running out of ideas other than exposing the >>>>>>>>>>>>>>> iterator itself >>>>>>>>>>>>>>> somehow (and add close() if needed?). Any further suggestions? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier >>>>>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it in >>>>>>>>>>>>>>> some sort of >>>>>>>>>>>>>>> StateResource in the fear that people might reject the proposal >>>>>>>>>>>>>>> immediately >>>>>>>>>>>>>>> after seeing the close() on the iterator. I guess our users are >>>>>>>>>>>>>>> familiar >>>>>>>>>>>>>>> with rocksDb state, it's pretty normal to close the >>>>>>>>>>>>>>> iterator/snapshot after >>>>>>>>>>>>>>> using it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik < >>>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The iterator going out of scope is the idiomatic way that >>>>>>>>>>>>>>>> resources are freed for Java developers (hence the >>>>>>>>>>>>>>>> weak/phantom reference >>>>>>>>>>>>>>>> suggestion). >>>>>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like >>>>>>>>>>>>>>>> file streams) lead to leaked resources. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles < >>>>>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks Xinyu, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I actually had first sketched out just what you wrote. But >>>>>>>>>>>>>>>>> then I realized a few things: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - usually an Iterable does not allocate resources, only >>>>>>>>>>>>>>>>> its Iterators >>>>>>>>>>>>>>>>> - if you consume the whole iterator, I hope the user >>>>>>>>>>>>>>>>> would not have to do any extra work >>>>>>>>>>>>>>>>> - you can also automatically free it up at the end of the >>>>>>>>>>>>>>>>> call to @ProcessElement so that is easy too (but you might >>>>>>>>>>>>>>>>> not want to) >>>>>>>>>>>>>>>>> - so the main use is when the iterator terminates early >>>>>>>>>>>>>>>>> and is not fully consumed and you can't wait to finish the >>>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>> - the scoping in Java will force a bunch of uninitialized >>>>>>>>>>>>>>>>> declarations outside the try-with-resources block, kind of a >>>>>>>>>>>>>>>>> lot of >>>>>>>>>>>>>>>>> boilerplate LOC >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> One thing that is good about your proposal is that the >>>>>>>>>>>>>>>>> iterable could have some transparent caches that all free up >>>>>>>>>>>>>>>>> together. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu < >>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for drafting the details about the two approaches, >>>>>>>>>>>>>>>>>> Kenn. Now I understand Luke's proposal better. The approach >>>>>>>>>>>>>>>>>> looks neat, but >>>>>>>>>>>>>>>>>> the uncertainty of *when* GC is going to kick in will >>>>>>>>>>>>>>>>>> make users' life hard. If the user happens to configure a >>>>>>>>>>>>>>>>>> large JVM heap >>>>>>>>>>>>>>>>>> size, and since rocksDb uses off-heap memory, GC might start >>>>>>>>>>>>>>>>>> very late and >>>>>>>>>>>>>>>>>> less frequent than what we want. If we don't have a >>>>>>>>>>>>>>>>>> *definitive* way to let user close the underlying >>>>>>>>>>>>>>>>>> resources, then there is no good way to handle such failures >>>>>>>>>>>>>>>>>> of a critical >>>>>>>>>>>>>>>>>> application in production. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Having a close method in the iterator might be a little >>>>>>>>>>>>>>>>>> unorthodox :). To some degree, this is actually a resource >>>>>>>>>>>>>>>>>> we are holding >>>>>>>>>>>>>>>>>> underneath, and I think it's pretty common to have close() >>>>>>>>>>>>>>>>>> for a resource >>>>>>>>>>>>>>>>>> in Java, e.g. BufferedReader and BufferedWriter. So I would >>>>>>>>>>>>>>>>>> imagine that we >>>>>>>>>>>>>>>>>> also define a resource for the state iterator and make the >>>>>>>>>>>>>>>>>> interface >>>>>>>>>>>>>>>>>> implements *AutoCloseable*. Here is my sketch: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> // StateResource MUST be closed after use. >>>>>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st = >>>>>>>>>>>>>>>>>> bagState.iteratorResource()) { >>>>>>>>>>>>>>>>>> Iterator<SomeType> iter = st.iterator(); >>>>>>>>>>>>>>>>>> while (iter.hasNext() { >>>>>>>>>>>>>>>>>> .. do stuff ... >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> } catch (Exception e) { >>>>>>>>>>>>>>>>>> ... user code >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The type/method name are just for illustrating here, so >>>>>>>>>>>>>>>>>> please don't laugh at them. Please feel free to comment and >>>>>>>>>>>>>>>>>> let me know if >>>>>>>>>>>>>>>>>> you have thoughts about the programming patterns here. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles < >>>>>>>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or >>>>>>>>>>>>>>>>>>> not. There has been no specific API proposed. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I think the problem statement is real - you need to be >>>>>>>>>>>>>>>>>>> able to read and write bigger-than-memory state. It seems >>>>>>>>>>>>>>>>>>> we have multiple >>>>>>>>>>>>>>>>>>> runners that don't support it, perhaps because of our API. >>>>>>>>>>>>>>>>>>> You might be >>>>>>>>>>>>>>>>>>> able to build something good enough with phantom >>>>>>>>>>>>>>>>>>> references, but you might >>>>>>>>>>>>>>>>>>> not. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> If I understand the idea, it might look something like >>>>>>>>>>>>>>>>>>> this: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>>>>>>>> @StateId("foo") >>>>>>>>>>>>>>>>>>> private final StateSpec<BagState<Whatever>> >>>>>>>>>>>>>>>>>>> myBagSpec = ... >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>>>>> public void proc(@StateId("foo") >>>>>>>>>>>>>>>>>>> BagState<Whatever> myBag, ...) { >>>>>>>>>>>>>>>>>>> CloseableIterator<Whatever> iterator = >>>>>>>>>>>>>>>>>>> myBag.get().iterator(); >>>>>>>>>>>>>>>>>>> while(iterator.hasNext() && ... special >>>>>>>>>>>>>>>>>>> condition ...) { >>>>>>>>>>>>>>>>>>> ... do stuff ... >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> iterator.close(); >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> So it has no impact on users who don't choose to close() >>>>>>>>>>>>>>>>>>> since they iterate with for ( : ) as usual. And a runner >>>>>>>>>>>>>>>>>>> that has the 10x >>>>>>>>>>>>>>>>>>> funding to try out a ReferenceQueue can be resilient to >>>>>>>>>>>>>>>>>>> users that forget. >>>>>>>>>>>>>>>>>>> On the other hand, I haven't seen this pattern much in the >>>>>>>>>>>>>>>>>>> wild, so I think >>>>>>>>>>>>>>>>>>> it is valuable to discuss other methods. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> While Luke's proposal is something like this if I >>>>>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with >>>>>>>>>>>>>>>>>>> PhantomReference seems >>>>>>>>>>>>>>>>>>> to be what you really want): >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ... in RocksDb state implementation ... >>>>>>>>>>>>>>>>>>> class RocksDbBagState { >>>>>>>>>>>>>>>>>>> static ReferenceQueue rocksDbIteratorQueue = new >>>>>>>>>>>>>>>>>>> ReferenceQueue(); >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> class Iterator { >>>>>>>>>>>>>>>>>>> PhantomReference<RocksDbJniIterator> cIter; >>>>>>>>>>>>>>>>>>> .next() { >>>>>>>>>>>>>>>>>>> return cIter.next(); >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> class Iterable { >>>>>>>>>>>>>>>>>>> .iterator() { >>>>>>>>>>>>>>>>>>> return new Iterator(new >>>>>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, >>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue)); >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ... on another thread ... >>>>>>>>>>>>>>>>>>> while(true) { >>>>>>>>>>>>>>>>>>> RocksDbIterator deadRef = (RocksDbIterator) >>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove(); >>>>>>>>>>>>>>>>>>> deadRef.close(); >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference will >>>>>>>>>>>>>>>>>>> pop onto the queue for being closed. This might not be too >>>>>>>>>>>>>>>>>>> bad. You'll have >>>>>>>>>>>>>>>>>>> delayed resource release, and potentially masked errors >>>>>>>>>>>>>>>>>>> that are hard to >>>>>>>>>>>>>>>>>>> debug. It is less error-prone than WeakReference, which is >>>>>>>>>>>>>>>>>>> asking for >>>>>>>>>>>>>>>>>>> trouble when objects are collected en masse. Anecdotally I >>>>>>>>>>>>>>>>>>> have heard that >>>>>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I haven't >>>>>>>>>>>>>>>>>>> experienced it >>>>>>>>>>>>>>>>>>> myself and I can't find good data. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> If I understand correctly, using weak references will >>>>>>>>>>>>>>>>>>>> help clean up the Java objects once GC kicks in. In case >>>>>>>>>>>>>>>>>>>> of kv-store likes >>>>>>>>>>>>>>>>>>>> rocksDb, the Java iterator is just a JNI interface to the >>>>>>>>>>>>>>>>>>>> underlying C >>>>>>>>>>>>>>>>>>>> iterator, so we need to explicitly invoke close to release >>>>>>>>>>>>>>>>>>>> the in-memory >>>>>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated quickly >>>>>>>>>>>>>>>>>>>> if it's not >>>>>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something as >>>>>>>>>>>>>>>>>>>> you suggested >>>>>>>>>>>>>>>>>>>> here, but looks to me using weak references might not help >>>>>>>>>>>>>>>>>>>> in this case. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I understand your concern, and I think you might >>>>>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working hard >>>>>>>>>>>>>>>>>>>> for best user >>>>>>>>>>>>>>>>>>>> experience, and I think the current API provides a good >>>>>>>>>>>>>>>>>>>> example of that. >>>>>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner here. I >>>>>>>>>>>>>>>>>>>> am just proposing >>>>>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed when >>>>>>>>>>>>>>>>>>>> not needed, that >>>>>>>>>>>>>>>>>>>> way the users can use this feature to iterate through >>>>>>>>>>>>>>>>>>>> large state that >>>>>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a pretty >>>>>>>>>>>>>>>>>>>> general use case >>>>>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually >>>>>>>>>>>>>>>>>>>> arguing this will be >>>>>>>>>>>>>>>>>>>> a better user experience to add this extra API since more >>>>>>>>>>>>>>>>>>>> users can benefit >>>>>>>>>>>>>>>>>>>> from it. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the >>>>>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by using >>>>>>>>>>>>>>>>>>>>> weak references and >>>>>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few people >>>>>>>>>>>>>>>>>>>>> work 10x as hard >>>>>>>>>>>>>>>>>>>>> to provide a good implementation is much better then >>>>>>>>>>>>>>>>>>>>> having 100s or 1000s >>>>>>>>>>>>>>>>>>>>> of users suffering through a more complicated API. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory >>>>>>>>>>>>>>>>>>>>>> footprint, but we still won't be able to release the >>>>>>>>>>>>>>>>>>>>>> underlying resources. >>>>>>>>>>>>>>>>>>>>>> We can add definitely heuristics to help release the >>>>>>>>>>>>>>>>>>>>>> resources as you >>>>>>>>>>>>>>>>>>>>>> mentioned, but there is no accurate way to track all the >>>>>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once not >>>>>>>>>>>>>>>>>>>>>> needed. I think while >>>>>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should have >>>>>>>>>>>>>>>>>>>>>> the option to let >>>>>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. Do >>>>>>>>>>>>>>>>>>>>>> you agree? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik < >>>>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources and >>>>>>>>>>>>>>>>>>>>>>> forcing them to will make the user experience worse. >>>>>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file format >>>>>>>>>>>>>>>>>>>>>>> which allows random access and use a cache to >>>>>>>>>>>>>>>>>>>>>>> load/evict blocks of the >>>>>>>>>>>>>>>>>>>>>>> state from memory. >>>>>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which frees >>>>>>>>>>>>>>>>>>>>>>> the resource after a certain amount of inactivity or >>>>>>>>>>>>>>>>>>>>>>> uses weak references. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu < >>>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi, folks, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and >>>>>>>>>>>>>>>>>>>>>>>> SetState in our Samza runner. We noticed that the >>>>>>>>>>>>>>>>>>>>>>>> state returns the Java >>>>>>>>>>>>>>>>>>>>>>>> Iterable for reading entries, keys, etc. For state >>>>>>>>>>>>>>>>>>>>>>>> backed by file-based kv >>>>>>>>>>>>>>>>>>>>>>>> store like rocksDb, we need to be able to let users >>>>>>>>>>>>>>>>>>>>>>>> explicitly close >>>>>>>>>>>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise >>>>>>>>>>>>>>>>>>>>>>>> we have to load the >>>>>>>>>>>>>>>>>>>>>>>> iterable into memory so we can safely close the >>>>>>>>>>>>>>>>>>>>>>>> underlying rocksDb >>>>>>>>>>>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But >>>>>>>>>>>>>>>>>>>>>>>> this won't work for states that don't fit into memory. >>>>>>>>>>>>>>>>>>>>>>>> I chatted with Kenn >>>>>>>>>>>>>>>>>>>>>>>> and he also agrees we need this capability to avoid >>>>>>>>>>>>>>>>>>>>>>>> bulk read/write. This >>>>>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we >>>>>>>>>>>>>>>>>>>>>>>> can add the support >>>>>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any >>>>>>>>>>>>>>>>>>>>>>>> feedback is highly >>>>>>>>>>>>>>>>>>>>>>>> appreciated. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>> Xinyu >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>> >>>>>