I am happy to chat about it over hangout or slack too. Let's talk offline
to set it up if needed.

Thanks,
Xinyu

On Tue, May 15, 2018 at 10:51 AM, Xinyu Liu <xinyuliu...@gmail.com> wrote:

> For Samza runner, it's always processes key+window pairs serially. To
> answer Luke's question:
>
> - Why Samza needs to use a snapshot in the first place and should be able
> to read data from RocksDb directly?
> I believe that the point of adding the readIterator() is to allow us to
> read from RocksDb directly without bulk loading to memory. In this case
> Samza will create the RocksDb iterator and expose it through the interface
> to the user. There is no need to create an explicit snapshot for this. For
> read() we need to load content into memory so we can release the rocksDb
> resources immediately and the user can iterate many times later. The reason
> I mentioned about snapshot is that if we don't load into memory for read(),
> we need to maintain rocskDb snapshot under the hood so the user can iterate
> the same content. This will cause memory and performance issues for our
> users.
>
> For the commit question, do you mean whether we commit in the middle of
> processing a bundle? That's not the case in Samza. Samza will wait for the
> bundle process complete, flush the output and state modifications and then
> checkpointing.
>
> Thanks,
> Xinyu
>
>
>
> On Tue, May 15, 2018 at 10:34 AM, Kenneth Knowles <k...@google.com> wrote:
>
>>
>>
>> On Tue, May 15, 2018 at 9:36 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> If it always processes key+window pairs serially, then I'm not sure why
>>> Samza needs to use a snapshot in the first place and should be able to read
>>> data from RocksDb directly.
>>>
>>> The other point of confusion is around how is a readIterator() any
>>> different then returning a wrapper that just invokes read().iterator() on a
>>> BagState. If that is the case then this is a trivial change which doesn't
>>> need to change the portability design.
>>>
>>
>> I think this is a key point. Just calling out the obvious to talk about
>> it:
>>
>>  - read() returns an iterable - a value with fixed contents that can be
>> iterated many times
>>  - readIterator() would return an iterator - a value with fixed contents
>> that can be iterated only once
>>
>> In both cases, later writes should not affect the observed contents. So
>> they both need some kind of snapshot for their lifetime. With
>> read().iterator() the intermediate iterable is immediately garbage, but
>> this is the reference queue based solution again, with some ref tracking
>> for iterables/iterators.
>>
>> I now see what you mean about committing within a bundle; we definitely
>> need to atomically commit the completion of upstream elements, output
>> elements, and state modifications. What are your thoughts on this Xinyu?
>>
>> Happy to join a quick hangout.
>>
>> Kenn
>>
>>
>> The nuance is likely that the interface between what a Runner implements
>>> to provide support for user state may give itself more visibility into what
>>> is going on then what the portability framework can provide as is.
>>>
>>> On Tue, May 15, 2018 at 8:57 AM Kenneth Knowles <k...@google.com> wrote:
>>>
>>>> OK, got it. But what consistency are you referring to? I was trying to
>>>> point out that there's nothing but straight-line program order consistency.
>>>> There's only one actor doing all the reads and all the writes.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, May 15, 2018 at 8:39 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> I misspoke when I said portability semantics and should have said
>>>>> portability design/implementation. This is why I had a follow-up e-mail 
>>>>> and
>>>>> clarified that I'm confused on:
>>>>> * I don't understand how you would want close to change the semantics
>>>>> of a user state specification and how it affects the lifetime of user 
>>>>> state?
>>>>> ** Does it represent committing information within a bundle?
>>>>> ** Does it mean that user state can ignore the replayable and
>>>>> consistent semantics for a lifetime of a bundle semantics?
>>>>>
>>>>> I'm trying to tie back what does a `ReadableState<Iterator>
>>>>> readIterator()` means for Runner authors and how it solves the
>>>>> memory/close() problem for Samza. Based upon https://s.apache.org/beam
>>>>> -state, reading and writing of state must be consistent. Does this
>>>>> mean that Samza must use snapshots for the lifetime of a bundle? If so, I
>>>>> don't see how adding a `ReadableState<Iterator> readIterator()` allows
>>>>> Samza to ignore the consistency requirement and be allowed to free
>>>>> snapshots.
>>>>>
>>>>> It might be worthwhile to setup a three way hangouts call to help me
>>>>> as I don't have the same level of context which can be shared back to this
>>>>> thread. Xinyu / Kenn, how about we setup a time using Slack?
>>>>>
>>>>> On Mon, May 14, 2018 at 8:36 PM Kenneth Knowles <k...@google.com>
>>>>> wrote:
>>>>>
>>>>>> I feel like this discussion is kind of far from the primary
>>>>>> intention. The point of ParDo(stateful DoFn) is to enable naive
>>>>>> single-threaded code in a style intuitive to a beginning imperative
>>>>>> programmer. So:
>>>>>>
>>>>>>  - the return value of read() should act like an immutable value
>>>>>>  - if there is a read after a write, that read should reflect the
>>>>>> changes written, because there's a trivial happens-before relationship
>>>>>> induced by program order
>>>>>>  - there are no non-trivial happens-before relationships
>>>>>>  - a write after a read should not affect the value read before
>>>>>>
>>>>>> From that starting point, the limitations on expressiveness (single
>>>>>> threaded per-key-and-window-and-step) give rise to embarrassingly 
>>>>>> parallel
>>>>>> computation and GC.
>>>>>>
>>>>>> So when you say "portability semantics" it makes me concerned. The
>>>>>> word "semantics" refers to the mapping between what a user writes and 
>>>>>> what
>>>>>> it means. Primarily, that means the conceptual translation between a
>>>>>> primitive PTransform and the corresponding PCollection-to-PCollection
>>>>>> operation. It is true that portability complicates things because a 
>>>>>> user's
>>>>>> code can only be given meaning relative to an SDK harness. But the 
>>>>>> end-user
>>>>>> intention is unchanged. If we had time and resources to give the Fn API a
>>>>>> solid spec, it should be such that the SDK harness has little choice but 
>>>>>> to
>>>>>> implement the primitives as intended.
>>>>>>
>>>>>> In other words, it is the job of https://s.apache.org/beam-fn-s
>>>>>> tate-api-and-bundle-processing* to implement
>>>>>> https://s.apache.org/beam-state. The discussion of adding
>>>>>> `ReadableState<Iterator> readIterator()` to BagState seems consistent 
>>>>>> with
>>>>>> the latter.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> *IIUC you've chosen to use the same underlying proto service for side
>>>>>> inputs and by-reference values. That's an implementation detail that I 
>>>>>> have
>>>>>> no particular opinion about except that if it complicates implementing 
>>>>>> the
>>>>>> primary motivator for state, it may not be a great fit.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, May 14, 2018 at 7:35 PM Kenneth Knowles <k...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I don't follow why allowing freeing resources would be counter to
>>>>>>> the spec. I don't really know what you mean by consistent for a bundle.
>>>>>>> State, in the sense of the user-facing per-key-and-window state API, is
>>>>>>> single threaded and scoped to a single DoFn. There's no one else who can
>>>>>>> write the state. If a BagState is read and written and read again, the
>>>>>>> user-facing logic should be unaware of the resources and not have any 
>>>>>>> logic
>>>>>>> to deal with consistency.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hmm, some of the problem I'm dealing with is:
>>>>>>>> * I don't understand how you would want close to change the
>>>>>>>> semantics of a user state specification and how it affects the 
>>>>>>>> lifetime of
>>>>>>>> user state?
>>>>>>>> ** Does it represent committing information within a bundle?
>>>>>>>> ** Does it mean that user state can ignore the replayable and
>>>>>>>> consistent semantics for a lifetime of a bundle semantics?
>>>>>>>> * I do understand that close semantics may make it easier for Samza
>>>>>>>> runner to support state that is greater then memory, but what does it
>>>>>>>> provide to users and how would they benefit (like what new scenarios 
>>>>>>>> would
>>>>>>>> it support)?
>>>>>>>> * I don't understand in RocksDb why you need a snapshot in the
>>>>>>>> first place, and whether RocksDb can support seeking inside or outside 
>>>>>>>> a
>>>>>>>> snapshot relatively efficiently?
>>>>>>>>
>>>>>>>> There seems to be some context lost initially with a discussion
>>>>>>>> that you and Kenn had, is there a copy of that which could be shared? 
>>>>>>>> May
>>>>>>>> help me get up to speed on to the problem that is being solved.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I will take a look at the docs to understand the problem better. A
>>>>>>>>> minor comment to 2) is that I don't intend to change the existing 
>>>>>>>>> iterable
>>>>>>>>> API. I plan to implement it similar to Flink, loading the data into 
>>>>>>>>> memory
>>>>>>>>> and closing the underlying snapshot after that. So the changes should 
>>>>>>>>> be
>>>>>>>>> backward compatible.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Xinyu
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I believe adding support for a state spec to be 'closed' or
>>>>>>>>>> 'freed' is counter to the requirement of a state spec being 
>>>>>>>>>> consistent for
>>>>>>>>>> the lifetime of a bundle, are we willing to change this requirement 
>>>>>>>>>> for the
>>>>>>>>>> lifetime of a bundle or say that runners can arbitrary say that a 
>>>>>>>>>> StateSpec
>>>>>>>>>> can't be accessed anymore?
>>>>>>>>>>
>>>>>>>>>> If not, I'm having trouble of thinking of a good way on how to
>>>>>>>>>> integrate the 'close/free' API with portability semantics because:
>>>>>>>>>> 1) Runners do control bundle size but other then the trivial
>>>>>>>>>> cases where a runner specifically chooses:
>>>>>>>>>>   a) to only process a single element at a time (negating the
>>>>>>>>>> need for a free/close() method).
>>>>>>>>>>   b) optimistically process elements and if you run out of memory
>>>>>>>>>> kill the bundle and try again with a smaller bundle.
>>>>>>>>>> 2) Adding an API to provide Iterators doesn't mean that users
>>>>>>>>>> can't still use iterable which can't be force closed. (Dropping 
>>>>>>>>>> Iterable in
>>>>>>>>>> exchange for Iterator in the API is backwards incompatible so likely 
>>>>>>>>>> to be
>>>>>>>>>> deferred till the next major version of Apache Beam).
>>>>>>>>>> 3) SDKs are able to process as many elements as they want in
>>>>>>>>>> parallel, nothing requires them to execute elements serially 
>>>>>>>>>> throughout the
>>>>>>>>>> pipeline graph.
>>>>>>>>>> 4) The runner has limited information into what an SDK is doing.
>>>>>>>>>> The SDK provides a lower bound on how many elements it has processed 
>>>>>>>>>> but
>>>>>>>>>> SDKs aren't required to implement this meaning that they could 
>>>>>>>>>> technically
>>>>>>>>>> be processing everything in random order after they have seen all 
>>>>>>>>>> input for
>>>>>>>>>> a bundle.
>>>>>>>>>>
>>>>>>>>>> We'll need to work through the scenarios, Xinyu it would be
>>>>>>>>>> useful for you to take a look at these two docs for context into the
>>>>>>>>>> problem space:
>>>>>>>>>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to
>>>>>>>>>> process a bundle)
>>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing
>>>>>>>>>> (How to access side inputs, access remote references, and support 
>>>>>>>>>> user
>>>>>>>>>> state)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <k...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <lc...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Before you go on and update the user facing API, we should
>>>>>>>>>>>> discuss the last point I made since the change your making will 
>>>>>>>>>>>> have
>>>>>>>>>>>> limited usability since the portability effort won't realistically 
>>>>>>>>>>>> allow
>>>>>>>>>>>> you to see such low level things like when processElement finished 
>>>>>>>>>>>> and
>>>>>>>>>>>> supporting user state will be modeled using the following three 
>>>>>>>>>>>> operations:
>>>>>>>>>>>> read (with continuation tokens), append (blind write), clear. It 
>>>>>>>>>>>> may be
>>>>>>>>>>>> moot to discuss how to integrate Samza into the existing framework 
>>>>>>>>>>>> and
>>>>>>>>>>>> should work towards being a portability based runner. There are 
>>>>>>>>>>>> more
>>>>>>>>>>>> details here about supporting user state during pipeline execution:
>>>>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing,
>>>>>>>>>>>> top level docs about portability are
>>>>>>>>>>>> https://s.apache.org/beam-runner-api and
>>>>>>>>>>>> https://s.apache.org/beam-fn-api.
>>>>>>>>>>>>
>>>>>>>>>>>> Do we really want to make a change to the user facing API for
>>>>>>>>>>>> user state in the Java SDK when that code path will be phased out 
>>>>>>>>>>>> in
>>>>>>>>>>>> exchange for using the portability APIs?
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> To be clear, the user API will remain the same, but the runner's
>>>>>>>>>>> implementation path will be phased out. So should we expand this 
>>>>>>>>>>> discussion
>>>>>>>>>>> to how the portability APIs enable the SDK and runner to 
>>>>>>>>>>> collaborate to
>>>>>>>>>>> achieve this use case? It seems like the interaction you need is 
>>>>>>>>>>> that the
>>>>>>>>>>> runner can tell that the SDK can closed the connection on the 
>>>>>>>>>>> read(), and
>>>>>>>>>>> the SDK needs to do so promptly, right?
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <
>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> We discussed internally about the proposed approaches. Seems
>>>>>>>>>>>>> if the State API can also expose another method to return a 
>>>>>>>>>>>>> ReadableState<Iterator>,
>>>>>>>>>>>>> it will cover our cases of iterating over a bigger-then-memory 
>>>>>>>>>>>>> state, and
>>>>>>>>>>>>> closing the underlying rocksDb snapshot immediately after the 
>>>>>>>>>>>>> iterator is
>>>>>>>>>>>>> fully consumed (most cases), and also safely close the rest of the
>>>>>>>>>>>>> iterators after proessElement (minor cases). If this sounds good 
>>>>>>>>>>>>> to you
>>>>>>>>>>>>> guys, I am going to create a JIRA ticket for it and open a PR for 
>>>>>>>>>>>>> it.
>>>>>>>>>>>>> Thanks a lot for the discussions here.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The discussions about Async processing is very interesting,
>>>>>>>>>>>>> and I think it's definitely worth its own thread. I believe we do 
>>>>>>>>>>>>> need the
>>>>>>>>>>>>> support from the Beam API/model so the users can take advantage 
>>>>>>>>>>>>> of it
>>>>>>>>>>>>> (Besides Samza, Flink also has an async operator that helps a lot 
>>>>>>>>>>>>> in the
>>>>>>>>>>>>> Alibaba's use cases). Yes, it will add complexity to the 
>>>>>>>>>>>>> underlying
>>>>>>>>>>>>> framework, but it's going to be great for the users to do remote 
>>>>>>>>>>>>> IO. In
>>>>>>>>>>>>> practice we found it actually avoids thread+locks issues, as Kenn 
>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>> above. I am not sure whether this feature can be runner-specific 
>>>>>>>>>>>>> support
>>>>>>>>>>>>> thing. I will probably create another email thread for this 
>>>>>>>>>>>>> discussion in
>>>>>>>>>>>>> the future and hopefully I can move this forward.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <
>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <lc...@google.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Users typically want to do that async operation and then
>>>>>>>>>>>>>>> produce output with it. Adding asynchronous execution is 
>>>>>>>>>>>>>>> difficult within
>>>>>>>>>>>>>>> the framework because a lot of code is currently not needed to 
>>>>>>>>>>>>>>> be thread
>>>>>>>>>>>>>>> safe and writing code to be fast and handle asynchronous 
>>>>>>>>>>>>>>> execution is quite
>>>>>>>>>>>>>>> difficult. Adding async operations typically leads to code with 
>>>>>>>>>>>>>>> too many
>>>>>>>>>>>>>>> locks/synchronization blocks.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just a small point here, as it is very unusual to use locks
>>>>>>>>>>>>>> with futures. The essential innovation of futures is that it is 
>>>>>>>>>>>>>> a new way
>>>>>>>>>>>>>> to program that avoids threads+locks style. In part, you get 
>>>>>>>>>>>>>> this for free
>>>>>>>>>>>>>> from functional programming style, and on the other hand you 
>>>>>>>>>>>>>> reify
>>>>>>>>>>>>>> asynchronous side effects as data dependencies.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Note that with the portability effort, the Runner won't have
>>>>>>>>>>>>>>> visibility into such low level things like when an object is 
>>>>>>>>>>>>>>> garbage
>>>>>>>>>>>>>>> collected and supporting user state will be modeled using the 
>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>> three operations: read (with continuation tokens), append 
>>>>>>>>>>>>>>> (blind write),
>>>>>>>>>>>>>>> clear. It may be moot to discuss how to integrate Samza into 
>>>>>>>>>>>>>>> the existing
>>>>>>>>>>>>>>> framework and should work towards being a portability based 
>>>>>>>>>>>>>>> runner. There
>>>>>>>>>>>>>>> are more details here about supporting user state during 
>>>>>>>>>>>>>>> pipeline
>>>>>>>>>>>>>>> execution: https://s.apache.org/beam-fn-state-api-and-bundle
>>>>>>>>>>>>>>> -processing, top level docs about portability are
>>>>>>>>>>>>>>> https://s.apache.org/beam-runner-api and
>>>>>>>>>>>>>>> https://s.apache.org/beam-fn-api.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> At least one API that has been discussed in the past, is to
>>>>>>>>>>>>>>>> use Java 8 CompletionStage. e.g.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  new DoFn<InputT, OutputT>() {
>>>>>>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>>>>>>>>>>>>>> element, ...) {
>>>>>>>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The framework will automatically create the
>>>>>>>>>>>>>>>> CompletionStage, and the process method can specify a pipeline 
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> asynchronous operations to perform on the element. When all of 
>>>>>>>>>>>>>>>> them are
>>>>>>>>>>>>>>>> done, the element will be marked as successfully processed.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Reuven
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <
>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for all the pointers. I looked though the
>>>>>>>>>>>>>>>>> discussion over BEAM-2975 and BEAM-2980 about having
>>>>>>>>>>>>>>>>> snapshot or live views of iterable, and the current semantics 
>>>>>>>>>>>>>>>>> makes a lot
>>>>>>>>>>>>>>>>> of sense to me.  For your question: it does not require an 
>>>>>>>>>>>>>>>>> explicit
>>>>>>>>>>>>>>>>> snapshot when we create RocksDb iterator directly. The 
>>>>>>>>>>>>>>>>> iterator will read
>>>>>>>>>>>>>>>>> from an implicit snapshot as of the time the iterator is 
>>>>>>>>>>>>>>>>> created [1], and
>>>>>>>>>>>>>>>>> the snapshot will be released after the iterator is closed. 
>>>>>>>>>>>>>>>>> If we can have
>>>>>>>>>>>>>>>>> another method to return ReadableState<Iterator>, we might be 
>>>>>>>>>>>>>>>>> able to apply
>>>>>>>>>>>>>>>>> the auto-closing approaches as we discussed and solve the 
>>>>>>>>>>>>>>>>> problem here :).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It's very interesting that you bring up the discussion
>>>>>>>>>>>>>>>>> about async API! Async IO has been widely adopted here among 
>>>>>>>>>>>>>>>>> our users:
>>>>>>>>>>>>>>>>> they use netty for async calls with library named ParSeq [2] 
>>>>>>>>>>>>>>>>> to help manage
>>>>>>>>>>>>>>>>> the calls. Samza provides a primitive callback style API [3], 
>>>>>>>>>>>>>>>>> in which the
>>>>>>>>>>>>>>>>> user will invoke the callback after the remote calls are 
>>>>>>>>>>>>>>>>> complete.
>>>>>>>>>>>>>>>>> Currently in a Samza job our users use this API with the 
>>>>>>>>>>>>>>>>> ParSeq lib for
>>>>>>>>>>>>>>>>> remote IO. Seems we might have to do blocking calls (thus the 
>>>>>>>>>>>>>>>>> poor resource
>>>>>>>>>>>>>>>>> utilization you mentioned) when using Beam API for now. It'll 
>>>>>>>>>>>>>>>>> be great if
>>>>>>>>>>>>>>>>> you can send a few more details about the discussion about 
>>>>>>>>>>>>>>>>> async API. I
>>>>>>>>>>>>>>>>> would like to add our use case and help move this forward.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator
>>>>>>>>>>>>>>>>> [2]: https://github.com/linkedin/parseq
>>>>>>>>>>>>>>>>> [3]: https://samza.apache.org/
>>>>>>>>>>>>>>>>> learn/documentation/0.14/api/j
>>>>>>>>>>>>>>>>> avadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <
>>>>>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I don't have any further suggestions, but want to call
>>>>>>>>>>>>>>>>>> out how this hits a lot of interesting points.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975
>>>>>>>>>>>>>>>>>> [1] and BEAM-2980 [2] where we debated things a bit. I think 
>>>>>>>>>>>>>>>>>> the strongest
>>>>>>>>>>>>>>>>>> case is for what you describe - it should be a snapshot. 
>>>>>>>>>>>>>>>>>> Perhaps they
>>>>>>>>>>>>>>>>>> should both be closed as fixed...
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> And you also bring up long blocking calls - we have also
>>>>>>>>>>>>>>>>>> deliberately decided that long synchronous blocking calls in
>>>>>>>>>>>>>>>>>> @ProcessElement can be embraced for simple programming and 
>>>>>>>>>>>>>>>>>> compensated with
>>>>>>>>>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing 
>>>>>>>>>>>>>>>>>> poor
>>>>>>>>>>>>>>>>>> utilization). The alternative is a more future-istic API 
>>>>>>>>>>>>>>>>>> where the calls
>>>>>>>>>>>>>>>>>> can be explicitly asynchronous. We've had some interesting 
>>>>>>>>>>>>>>>>>> dev@
>>>>>>>>>>>>>>>>>> list discussions about that, too.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Is another possibility to perhaps have read() return a
>>>>>>>>>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have 
>>>>>>>>>>>>>>>>>> two methods with
>>>>>>>>>>>>>>>>>> different names, one for iterator one for snapshot iterable. 
>>>>>>>>>>>>>>>>>> But wouldn't
>>>>>>>>>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native 
>>>>>>>>>>>>>>>>>> RocksDb iterator
>>>>>>>>>>>>>>>>>> require a snapshot to have well-defined contents? As you can 
>>>>>>>>>>>>>>>>>> tell, I don't
>>>>>>>>>>>>>>>>>> know enough about RocksDb details to be sure of my 
>>>>>>>>>>>>>>>>>> suggestions.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980
>>>>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I
>>>>>>>>>>>>>>>>>>> posted the question here, we discussed internally about 
>>>>>>>>>>>>>>>>>>> releasing the
>>>>>>>>>>>>>>>>>>> underlying resources after consuming the whole iterator. 
>>>>>>>>>>>>>>>>>>> This probably
>>>>>>>>>>>>>>>>>>> covers quite a lot of use cases. For some special cases 
>>>>>>>>>>>>>>>>>>> that the user only
>>>>>>>>>>>>>>>>>>> consume part of the iterator, Luke and Kenn's suggestion 
>>>>>>>>>>>>>>>>>>> about releasing
>>>>>>>>>>>>>>>>>>> after processElement() might work (I need to confirm about 
>>>>>>>>>>>>>>>>>>> this with our
>>>>>>>>>>>>>>>>>>> use cases). So based on what we discussed so far, we might 
>>>>>>>>>>>>>>>>>>> have a good way
>>>>>>>>>>>>>>>>>>> to automatically close an iterator for the store.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> There is another issue though: right now the state API
>>>>>>>>>>>>>>>>>>> returns an iterable for entries(), keys() and values(), and 
>>>>>>>>>>>>>>>>>>> we can create
>>>>>>>>>>>>>>>>>>> iterator from it. From my understanding, the iterable holds 
>>>>>>>>>>>>>>>>>>> a snapshot of
>>>>>>>>>>>>>>>>>>> the underlying store. In case of rocksDb, it's going to be 
>>>>>>>>>>>>>>>>>>> a db.snapshot().
>>>>>>>>>>>>>>>>>>> Then when can we release the snapshot? It's not like 
>>>>>>>>>>>>>>>>>>> iterator where we can
>>>>>>>>>>>>>>>>>>> use some heuristics to automatically release it. The user 
>>>>>>>>>>>>>>>>>>> can hold on to
>>>>>>>>>>>>>>>>>>> the iterable and create iterators throughout the whole 
>>>>>>>>>>>>>>>>>>> processElement().
>>>>>>>>>>>>>>>>>>> But if we only close the iterable after processElement(), I 
>>>>>>>>>>>>>>>>>>> am quite
>>>>>>>>>>>>>>>>>>> concerned about the limitations this will bring. If the 
>>>>>>>>>>>>>>>>>>> user is doing some
>>>>>>>>>>>>>>>>>>> remote call during the process, then the snapshot might be 
>>>>>>>>>>>>>>>>>>> held for a long
>>>>>>>>>>>>>>>>>>> time before releasing, and might cause performance 
>>>>>>>>>>>>>>>>>>> problems. And if the
>>>>>>>>>>>>>>>>>>> user happen to create multiple iterables, then there will 
>>>>>>>>>>>>>>>>>>> be multiple
>>>>>>>>>>>>>>>>>>> snapshots loaded during process. Luke suggested being 
>>>>>>>>>>>>>>>>>>> aggressive at closing
>>>>>>>>>>>>>>>>>>> the resources and recreating when needed again. But in this 
>>>>>>>>>>>>>>>>>>> case it might
>>>>>>>>>>>>>>>>>>> not work since we won't be able to recreate the same 
>>>>>>>>>>>>>>>>>>> snapshot given the
>>>>>>>>>>>>>>>>>>> store might have been updated (and creating rocksDb 
>>>>>>>>>>>>>>>>>>> snapshot is not cheap
>>>>>>>>>>>>>>>>>>> too). I am running out of ideas other than exposing the 
>>>>>>>>>>>>>>>>>>> iterator itself
>>>>>>>>>>>>>>>>>>> somehow (and add close() if needed?). Any further 
>>>>>>>>>>>>>>>>>>> suggestions?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier
>>>>>>>>>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it 
>>>>>>>>>>>>>>>>>>> in some sort of
>>>>>>>>>>>>>>>>>>> StateResource in the fear that people might reject the 
>>>>>>>>>>>>>>>>>>> proposal immediately
>>>>>>>>>>>>>>>>>>> after seeing the close() on the iterator. I guess our users 
>>>>>>>>>>>>>>>>>>> are familiar
>>>>>>>>>>>>>>>>>>> with rocksDb state, it's pretty normal to close the 
>>>>>>>>>>>>>>>>>>> iterator/snapshot after
>>>>>>>>>>>>>>>>>>> using it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The iterator going out of scope is the idiomatic way
>>>>>>>>>>>>>>>>>>>> that resources are freed for Java developers (hence the 
>>>>>>>>>>>>>>>>>>>> weak/phantom
>>>>>>>>>>>>>>>>>>>> reference suggestion).
>>>>>>>>>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like
>>>>>>>>>>>>>>>>>>>> file streams) lead to leaked resources.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <
>>>>>>>>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks Xinyu,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I actually had first sketched out just what you wrote.
>>>>>>>>>>>>>>>>>>>>> But then I realized a few things:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>  - usually an Iterable does not allocate resources,
>>>>>>>>>>>>>>>>>>>>> only its Iterators
>>>>>>>>>>>>>>>>>>>>>  - if you consume the whole iterator, I hope the user
>>>>>>>>>>>>>>>>>>>>> would not have to do any extra work
>>>>>>>>>>>>>>>>>>>>>  - you can also automatically free it up at the end of
>>>>>>>>>>>>>>>>>>>>> the call to @ProcessElement so that is easy too (but you 
>>>>>>>>>>>>>>>>>>>>> might not want to)
>>>>>>>>>>>>>>>>>>>>>  - so the main use is when the iterator terminates
>>>>>>>>>>>>>>>>>>>>> early and is not fully consumed and you can't wait to 
>>>>>>>>>>>>>>>>>>>>> finish the method
>>>>>>>>>>>>>>>>>>>>>  - the scoping in Java will force a bunch of
>>>>>>>>>>>>>>>>>>>>> uninitialized declarations outside the try-with-resources 
>>>>>>>>>>>>>>>>>>>>> block, kind of a
>>>>>>>>>>>>>>>>>>>>> lot of boilerplate LOC
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> One thing that is good about your proposal is that the
>>>>>>>>>>>>>>>>>>>>> iterable could have some transparent caches that all free 
>>>>>>>>>>>>>>>>>>>>> up together.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks for drafting the details about the two
>>>>>>>>>>>>>>>>>>>>>> approaches, Kenn. Now I understand Luke's proposal 
>>>>>>>>>>>>>>>>>>>>>> better. The approach
>>>>>>>>>>>>>>>>>>>>>> looks neat, but the uncertainty of *when* GC is
>>>>>>>>>>>>>>>>>>>>>> going to kick in will make users' life hard. If the user 
>>>>>>>>>>>>>>>>>>>>>> happens to
>>>>>>>>>>>>>>>>>>>>>> configure a large JVM heap size, and since rocksDb uses 
>>>>>>>>>>>>>>>>>>>>>> off-heap memory, GC
>>>>>>>>>>>>>>>>>>>>>> might start very late and less frequent than what we 
>>>>>>>>>>>>>>>>>>>>>> want. If we don't have
>>>>>>>>>>>>>>>>>>>>>> a *definitive* way to let user close the underlying
>>>>>>>>>>>>>>>>>>>>>> resources, then there is no good way to handle such 
>>>>>>>>>>>>>>>>>>>>>> failures of a critical
>>>>>>>>>>>>>>>>>>>>>> application in production.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Having a close method in the iterator might be a
>>>>>>>>>>>>>>>>>>>>>> little unorthodox :). To some degree, this is actually a 
>>>>>>>>>>>>>>>>>>>>>> resource we are
>>>>>>>>>>>>>>>>>>>>>> holding underneath, and I think it's pretty common to 
>>>>>>>>>>>>>>>>>>>>>> have close() for a
>>>>>>>>>>>>>>>>>>>>>> resource in Java, e.g. BufferedReader and 
>>>>>>>>>>>>>>>>>>>>>> BufferedWriter. So I would
>>>>>>>>>>>>>>>>>>>>>> imagine that we also define a resource for the state 
>>>>>>>>>>>>>>>>>>>>>> iterator and make the
>>>>>>>>>>>>>>>>>>>>>> interface implements *AutoCloseable*. Here is my
>>>>>>>>>>>>>>>>>>>>>> sketch:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> // StateResource MUST be closed after use.
>>>>>>>>>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st =
>>>>>>>>>>>>>>>>>>>>>> bagState.iteratorResource()) {
>>>>>>>>>>>>>>>>>>>>>>     Iterator<SomeType> iter = st.iterator();
>>>>>>>>>>>>>>>>>>>>>>     while (iter.hasNext() {
>>>>>>>>>>>>>>>>>>>>>>        .. do stuff ...
>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>> } catch (Exception e) {
>>>>>>>>>>>>>>>>>>>>>>     ... user code
>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The type/method name are just for illustrating here,
>>>>>>>>>>>>>>>>>>>>>> so please don't laugh at them. Please feel free to 
>>>>>>>>>>>>>>>>>>>>>> comment and let me know
>>>>>>>>>>>>>>>>>>>>>> if you have thoughts about the programming patterns here.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <
>>>>>>>>>>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or
>>>>>>>>>>>>>>>>>>>>>>> not. There has been no specific API proposed.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I think the problem statement is real - you need to
>>>>>>>>>>>>>>>>>>>>>>> be able to read and write bigger-than-memory state. It 
>>>>>>>>>>>>>>>>>>>>>>> seems we have
>>>>>>>>>>>>>>>>>>>>>>> multiple runners that don't support it, perhaps because 
>>>>>>>>>>>>>>>>>>>>>>> of our API. You
>>>>>>>>>>>>>>>>>>>>>>> might be able to build something good enough with 
>>>>>>>>>>>>>>>>>>>>>>> phantom references, but
>>>>>>>>>>>>>>>>>>>>>>> you might not.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> If I understand the idea, it might look something
>>>>>>>>>>>>>>>>>>>>>>> like this:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>>>>>        @StateId("foo")
>>>>>>>>>>>>>>>>>>>>>>>        private final StateSpec<BagState<Whatever>>
>>>>>>>>>>>>>>>>>>>>>>> myBagSpec = ...
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>        @ProcessElement
>>>>>>>>>>>>>>>>>>>>>>>        public void proc(@StateId("foo")
>>>>>>>>>>>>>>>>>>>>>>> BagState<Whatever> myBag, ...) {
>>>>>>>>>>>>>>>>>>>>>>>          CloseableIterator<Whatever> iterator =
>>>>>>>>>>>>>>>>>>>>>>> myBag.get().iterator();
>>>>>>>>>>>>>>>>>>>>>>>          while(iterator.hasNext() && ... special
>>>>>>>>>>>>>>>>>>>>>>> condition ...) {
>>>>>>>>>>>>>>>>>>>>>>>            ... do stuff ...
>>>>>>>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>>>>>>>          iterator.close();
>>>>>>>>>>>>>>>>>>>>>>>        }
>>>>>>>>>>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> So it has no impact on users who don't choose to
>>>>>>>>>>>>>>>>>>>>>>> close() since they iterate with for ( : ) as usual. And 
>>>>>>>>>>>>>>>>>>>>>>> a runner that has
>>>>>>>>>>>>>>>>>>>>>>> the 10x funding to try out a ReferenceQueue can be 
>>>>>>>>>>>>>>>>>>>>>>> resilient to users that
>>>>>>>>>>>>>>>>>>>>>>> forget. On the other hand, I haven't seen this pattern 
>>>>>>>>>>>>>>>>>>>>>>> much in the wild, so
>>>>>>>>>>>>>>>>>>>>>>> I think it is valuable to discuss other methods.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> While Luke's proposal is something like this if I
>>>>>>>>>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with 
>>>>>>>>>>>>>>>>>>>>>>> PhantomReference seems
>>>>>>>>>>>>>>>>>>>>>>> to be what you really want):
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>     ... in RocksDb state implementation ...
>>>>>>>>>>>>>>>>>>>>>>>     class RocksDbBagState {
>>>>>>>>>>>>>>>>>>>>>>>       static ReferenceQueue rocksDbIteratorQueue =
>>>>>>>>>>>>>>>>>>>>>>> new ReferenceQueue();
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>       class Iterator {
>>>>>>>>>>>>>>>>>>>>>>>          PhantomReference<RocksDbJniIterator> cIter;
>>>>>>>>>>>>>>>>>>>>>>>          .next() {
>>>>>>>>>>>>>>>>>>>>>>>            return cIter.next();
>>>>>>>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>      class Iterable {
>>>>>>>>>>>>>>>>>>>>>>>         .iterator() {
>>>>>>>>>>>>>>>>>>>>>>>           return new Iterator(new
>>>>>>>>>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator,
>>>>>>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue));
>>>>>>>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>     ... on another thread ...
>>>>>>>>>>>>>>>>>>>>>>>     while(true) {
>>>>>>>>>>>>>>>>>>>>>>>       RocksDbIterator deadRef = (RocksDbIterator)
>>>>>>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove();
>>>>>>>>>>>>>>>>>>>>>>>       deadRef.close();
>>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference
>>>>>>>>>>>>>>>>>>>>>>> will pop onto the queue for being closed. This might 
>>>>>>>>>>>>>>>>>>>>>>> not be too bad. You'll
>>>>>>>>>>>>>>>>>>>>>>> have delayed resource release, and potentially masked 
>>>>>>>>>>>>>>>>>>>>>>> errors that are hard
>>>>>>>>>>>>>>>>>>>>>>> to debug. It is less error-prone than WeakReference, 
>>>>>>>>>>>>>>>>>>>>>>> which is asking for
>>>>>>>>>>>>>>>>>>>>>>> trouble when objects are collected en masse. 
>>>>>>>>>>>>>>>>>>>>>>> Anecdotally I have heard that
>>>>>>>>>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I 
>>>>>>>>>>>>>>>>>>>>>>> haven't experienced it
>>>>>>>>>>>>>>>>>>>>>>> myself and I can't find good data.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> If I understand correctly, using weak references
>>>>>>>>>>>>>>>>>>>>>>>> will help clean up the Java objects once GC kicks in. 
>>>>>>>>>>>>>>>>>>>>>>>> In case of kv-store
>>>>>>>>>>>>>>>>>>>>>>>> likes rocksDb, the Java iterator is just a JNI 
>>>>>>>>>>>>>>>>>>>>>>>> interface to the underlying
>>>>>>>>>>>>>>>>>>>>>>>> C iterator, so we need to explicitly invoke close to 
>>>>>>>>>>>>>>>>>>>>>>>> release the in-memory
>>>>>>>>>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated 
>>>>>>>>>>>>>>>>>>>>>>>> quickly if it's not
>>>>>>>>>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something 
>>>>>>>>>>>>>>>>>>>>>>>> as you suggested
>>>>>>>>>>>>>>>>>>>>>>>> here, but looks to me using weak references might not 
>>>>>>>>>>>>>>>>>>>>>>>> help in this case.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I understand your concern, and I think you might
>>>>>>>>>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working 
>>>>>>>>>>>>>>>>>>>>>>>> hard for best user
>>>>>>>>>>>>>>>>>>>>>>>> experience, and I think the current API provides a 
>>>>>>>>>>>>>>>>>>>>>>>> good example of that.
>>>>>>>>>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner 
>>>>>>>>>>>>>>>>>>>>>>>> here. I am just proposing
>>>>>>>>>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed 
>>>>>>>>>>>>>>>>>>>>>>>> when not needed, that
>>>>>>>>>>>>>>>>>>>>>>>> way the users can use this feature to iterate through 
>>>>>>>>>>>>>>>>>>>>>>>> large state that
>>>>>>>>>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a 
>>>>>>>>>>>>>>>>>>>>>>>> pretty general use case
>>>>>>>>>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually 
>>>>>>>>>>>>>>>>>>>>>>>> arguing this will be
>>>>>>>>>>>>>>>>>>>>>>>> a better user experience to add this extra API since 
>>>>>>>>>>>>>>>>>>>>>>>> more users can benefit
>>>>>>>>>>>>>>>>>>>>>>>> from it.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the
>>>>>>>>>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by 
>>>>>>>>>>>>>>>>>>>>>>>>> using weak references and
>>>>>>>>>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few 
>>>>>>>>>>>>>>>>>>>>>>>>> people work 10x as hard
>>>>>>>>>>>>>>>>>>>>>>>>> to provide a good implementation is much better then 
>>>>>>>>>>>>>>>>>>>>>>>>> having 100s or 1000s
>>>>>>>>>>>>>>>>>>>>>>>>> of users suffering through a more complicated API.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache
>>>>>>>>>>>>>>>>>>>>>>>>>> memory footprint, but we still won't be able to 
>>>>>>>>>>>>>>>>>>>>>>>>>> release the underlying
>>>>>>>>>>>>>>>>>>>>>>>>>> resources. We can add definitely heuristics to help 
>>>>>>>>>>>>>>>>>>>>>>>>>> release the resources
>>>>>>>>>>>>>>>>>>>>>>>>>> as you mentioned, but there is no accurate way to 
>>>>>>>>>>>>>>>>>>>>>>>>>> track all the
>>>>>>>>>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once 
>>>>>>>>>>>>>>>>>>>>>>>>>> not needed. I think while
>>>>>>>>>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should 
>>>>>>>>>>>>>>>>>>>>>>>>>> have the option to let
>>>>>>>>>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. 
>>>>>>>>>>>>>>>>>>>>>>>>>> Do you agree?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources
>>>>>>>>>>>>>>>>>>>>>>>>>>> and forcing them to will make the user experience 
>>>>>>>>>>>>>>>>>>>>>>>>>>> worse.
>>>>>>>>>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file
>>>>>>>>>>>>>>>>>>>>>>>>>>> format which allows random access and use a cache 
>>>>>>>>>>>>>>>>>>>>>>>>>>> to load/evict blocks of
>>>>>>>>>>>>>>>>>>>>>>>>>>> the state from memory.
>>>>>>>>>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which
>>>>>>>>>>>>>>>>>>>>>>>>>>> frees the resource after a certain amount of 
>>>>>>>>>>>>>>>>>>>>>>>>>>> inactivity or uses weak
>>>>>>>>>>>>>>>>>>>>>>>>>>> references.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, folks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and SetState in our Samza runner. We noticed that 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the state returns the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Java Iterable for reading entries, keys, etc. For 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> state backed by
>>>>>>>>>>>>>>>>>>>>>>>>>>>> file-based kv store like rocksDb, we need to be 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> able to let users
>>>>>>>>>>>>>>>>>>>>>>>>>>>> explicitly close iterator/iterable to release the 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> resources.Otherwise we
>>>>>>>>>>>>>>>>>>>>>>>>>>>> have to load the iterable into memory so we can 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> safely close the underlying
>>>>>>>>>>>>>>>>>>>>>>>>>>>> rocksDb iterator, similar to Flink's 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation. But
>>>>>>>>>>>>>>>>>>>>>>>>>>>> this won't work for states that don't fit into 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> memory. I chatted with Kenn
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and he also agrees we need this capability to 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> avoid bulk read/write. This
>>>>>>>>>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> if we can add the support
>>>>>>>>>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Any feedback is highly
>>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>
>

Reply via email to