OK, got it. But what consistency are you referring to? I was trying to
point out that there's nothing but straight-line program order consistency.
There's only one actor doing all the reads and all the writes.

Kenn

On Tue, May 15, 2018 at 8:39 AM Lukasz Cwik <lc...@google.com> wrote:

> I misspoke when I said portability semantics and should have said
> portability design/implementation. This is why I had a follow-up e-mail and
> clarified that I'm confused on:
> * I don't understand how you would want close to change the semantics of a
> user state specification and how it affects the lifetime of user state?
> ** Does it represent committing information within a bundle?
> ** Does it mean that user state can ignore the replayable and consistent
> semantics for a lifetime of a bundle semantics?
>
> I'm trying to tie back what does a `ReadableState<Iterator>
> readIterator()` means for Runner authors and how it solves the
> memory/close() problem for Samza. Based upon
> https://s.apache.org/beam-state, reading and writing of state must be
> consistent. Does this mean that Samza must use snapshots for the lifetime
> of a bundle? If so, I don't see how adding a `ReadableState<Iterator>
> readIterator()` allows Samza to ignore the consistency requirement and be
> allowed to free snapshots.
>
> It might be worthwhile to setup a three way hangouts call to help me as I
> don't have the same level of context which can be shared back to this
> thread. Xinyu / Kenn, how about we setup a time using Slack?
>
> On Mon, May 14, 2018 at 8:36 PM Kenneth Knowles <k...@google.com> wrote:
>
>> I feel like this discussion is kind of far from the primary intention.
>> The point of ParDo(stateful DoFn) is to enable naive single-threaded code
>> in a style intuitive to a beginning imperative programmer. So:
>>
>>  - the return value of read() should act like an immutable value
>>  - if there is a read after a write, that read should reflect the changes
>> written, because there's a trivial happens-before relationship induced by
>> program order
>>  - there are no non-trivial happens-before relationships
>>  - a write after a read should not affect the value read before
>>
>> From that starting point, the limitations on expressiveness (single
>> threaded per-key-and-window-and-step) give rise to embarrassingly parallel
>> computation and GC.
>>
>> So when you say "portability semantics" it makes me concerned. The word
>> "semantics" refers to the mapping between what a user writes and what it
>> means. Primarily, that means the conceptual translation between a primitive
>> PTransform and the corresponding PCollection-to-PCollection operation. It
>> is true that portability complicates things because a user's code can only
>> be given meaning relative to an SDK harness. But the end-user intention is
>> unchanged. If we had time and resources to give the Fn API a solid spec, it
>> should be such that the SDK harness has little choice but to implement the
>> primitives as intended.
>>
>> In other words, it is the job of
>> https://s.apache.org/beam-fn-state-api-and-bundle-processing* to
>> implement https://s.apache.org/beam-state. The discussion of adding
>> `ReadableState<Iterator> readIterator()` to BagState seems consistent with
>> the latter.
>>
>> Kenn
>>
>> *IIUC you've chosen to use the same underlying proto service for side
>> inputs and by-reference values. That's an implementation detail that I have
>> no particular opinion about except that if it complicates implementing the
>> primary motivator for state, it may not be a great fit.
>>
>>
>>
>> On Mon, May 14, 2018 at 7:35 PM Kenneth Knowles <k...@google.com> wrote:
>>
>>> I don't follow why allowing freeing resources would be counter to the
>>> spec. I don't really know what you mean by consistent for a bundle. State,
>>> in the sense of the user-facing per-key-and-window state API, is single
>>> threaded and scoped to a single DoFn. There's no one else who can write the
>>> state. If a BagState is read and written and read again, the user-facing
>>> logic should be unaware of the resources and not have any logic to deal
>>> with consistency.
>>>
>>> Kenn
>>>
>>> On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Hmm, some of the problem I'm dealing with is:
>>>> * I don't understand how you would want close to change the semantics
>>>> of a user state specification and how it affects the lifetime of user 
>>>> state?
>>>> ** Does it represent committing information within a bundle?
>>>> ** Does it mean that user state can ignore the replayable and
>>>> consistent semantics for a lifetime of a bundle semantics?
>>>> * I do understand that close semantics may make it easier for Samza
>>>> runner to support state that is greater then memory, but what does it
>>>> provide to users and how would they benefit (like what new scenarios would
>>>> it support)?
>>>> * I don't understand in RocksDb why you need a snapshot in the first
>>>> place, and whether RocksDb can support seeking inside or outside a snapshot
>>>> relatively efficiently?
>>>>
>>>> There seems to be some context lost initially with a discussion that
>>>> you and Kenn had, is there a copy of that which could be shared? May help
>>>> me get up to speed on to the problem that is being solved.
>>>>
>>>>
>>>> On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>> wrote:
>>>>
>>>>> I will take a look at the docs to understand the problem better. A
>>>>> minor comment to 2) is that I don't intend to change the existing iterable
>>>>> API. I plan to implement it similar to Flink, loading the data into memory
>>>>> and closing the underlying snapshot after that. So the changes should be
>>>>> backward compatible.
>>>>>
>>>>> Thanks,
>>>>> Xinyu
>>>>>
>>>>>
>>>>>
>>>>> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I believe adding support for a state spec to be 'closed' or 'freed'
>>>>>> is counter to the requirement of a state spec being consistent for the
>>>>>> lifetime of a bundle, are we willing to change this requirement for the
>>>>>> lifetime of a bundle or say that runners can arbitrary say that a 
>>>>>> StateSpec
>>>>>> can't be accessed anymore?
>>>>>>
>>>>>> If not, I'm having trouble of thinking of a good way on how to
>>>>>> integrate the 'close/free' API with portability semantics because:
>>>>>> 1) Runners do control bundle size but other then the trivial cases
>>>>>> where a runner specifically chooses:
>>>>>>   a) to only process a single element at a time (negating the need
>>>>>> for a free/close() method).
>>>>>>   b) optimistically process elements and if you run out of memory
>>>>>> kill the bundle and try again with a smaller bundle.
>>>>>> 2) Adding an API to provide Iterators doesn't mean that users can't
>>>>>> still use iterable which can't be force closed. (Dropping Iterable in
>>>>>> exchange for Iterator in the API is backwards incompatible so likely to 
>>>>>> be
>>>>>> deferred till the next major version of Apache Beam).
>>>>>> 3) SDKs are able to process as many elements as they want in
>>>>>> parallel, nothing requires them to execute elements serially throughout 
>>>>>> the
>>>>>> pipeline graph.
>>>>>> 4) The runner has limited information into what an SDK is doing. The
>>>>>> SDK provides a lower bound on how many elements it has processed but SDKs
>>>>>> aren't required to implement this meaning that they could technically be
>>>>>> processing everything in random order after they have seen all input for 
>>>>>> a
>>>>>> bundle.
>>>>>>
>>>>>> We'll need to work through the scenarios, Xinyu it would be useful
>>>>>> for you to take a look at these two docs for context into the problem 
>>>>>> space:
>>>>>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to process
>>>>>> a bundle)
>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing (How to
>>>>>> access side inputs, access remote references, and support user state)
>>>>>>
>>>>>>
>>>>>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <k...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Before you go on and update the user facing API, we should discuss
>>>>>>>> the last point I made since the change your making will have limited
>>>>>>>> usability since the portability effort won't realistically allow you 
>>>>>>>> to see
>>>>>>>> such low level things like when processElement finished and supporting 
>>>>>>>> user
>>>>>>>> state will be modeled using the following three operations: read (with
>>>>>>>> continuation tokens), append (blind write), clear. It may be moot to
>>>>>>>> discuss how to integrate Samza into the existing framework and should 
>>>>>>>> work
>>>>>>>> towards being a portability based runner. There are more details here 
>>>>>>>> about
>>>>>>>> supporting user state during pipeline execution:
>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top
>>>>>>>> level docs about portability are
>>>>>>>> https://s.apache.org/beam-runner-api and
>>>>>>>> https://s.apache.org/beam-fn-api.
>>>>>>>>
>>>>>>>> Do we really want to make a change to the user facing API for user
>>>>>>>> state in the Java SDK when that code path will be phased out in 
>>>>>>>> exchange
>>>>>>>> for using the portability APIs?
>>>>>>>>
>>>>>>>
>>>>>>> To be clear, the user API will remain the same, but the runner's
>>>>>>> implementation path will be phased out. So should we expand this 
>>>>>>> discussion
>>>>>>> to how the portability APIs enable the SDK and runner to collaborate to
>>>>>>> achieve this use case? It seems like the interaction you need is that 
>>>>>>> the
>>>>>>> runner can tell that the SDK can closed the connection on the read(), 
>>>>>>> and
>>>>>>> the SDK needs to do so promptly, right?
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> We discussed internally about the proposed approaches. Seems if
>>>>>>>>> the State API can also expose another method to return a 
>>>>>>>>> ReadableState<Iterator>,
>>>>>>>>> it will cover our cases of iterating over a bigger-then-memory state, 
>>>>>>>>> and
>>>>>>>>> closing the underlying rocksDb snapshot immediately after the 
>>>>>>>>> iterator is
>>>>>>>>> fully consumed (most cases), and also safely close the rest of the
>>>>>>>>> iterators after proessElement (minor cases). If this sounds good to 
>>>>>>>>> you
>>>>>>>>> guys, I am going to create a JIRA ticket for it and open a PR for it.
>>>>>>>>> Thanks a lot for the discussions here.
>>>>>>>>>
>>>>>>>>> The discussions about Async processing is very interesting, and I
>>>>>>>>> think it's definitely worth its own thread. I believe we do need the
>>>>>>>>> support from the Beam API/model so the users can take advantage of it
>>>>>>>>> (Besides Samza, Flink also has an async operator that helps a lot in 
>>>>>>>>> the
>>>>>>>>> Alibaba's use cases). Yes, it will add complexity to the underlying
>>>>>>>>> framework, but it's going to be great for the users to do remote IO. 
>>>>>>>>> In
>>>>>>>>> practice we found it actually avoids thread+locks issues, as Kenn 
>>>>>>>>> mentioned
>>>>>>>>> above. I am not sure whether this feature can be runner-specific 
>>>>>>>>> support
>>>>>>>>> thing. I will probably create another email thread for this 
>>>>>>>>> discussion in
>>>>>>>>> the future and hopefully I can move this forward.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Xinyu
>>>>>>>>>
>>>>>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <k...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <lc...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Users typically want to do that async operation and then produce
>>>>>>>>>>> output with it. Adding asynchronous execution is difficult within 
>>>>>>>>>>> the
>>>>>>>>>>> framework because a lot of code is currently not needed to be 
>>>>>>>>>>> thread safe
>>>>>>>>>>> and writing code to be fast and handle asynchronous execution is 
>>>>>>>>>>> quite
>>>>>>>>>>> difficult. Adding async operations typically leads to code with too 
>>>>>>>>>>> many
>>>>>>>>>>> locks/synchronization blocks.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Just a small point here, as it is very unusual to use locks with
>>>>>>>>>> futures. The essential innovation of futures is that it is a new way 
>>>>>>>>>> to
>>>>>>>>>> program that avoids threads+locks style. In part, you get this for 
>>>>>>>>>> free
>>>>>>>>>> from functional programming style, and on the other hand you reify
>>>>>>>>>> asynchronous side effects as data dependencies.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Note that with the portability effort, the Runner won't have
>>>>>>>>>>> visibility into such low level things like when an object is garbage
>>>>>>>>>>> collected and supporting user state will be modeled using the 
>>>>>>>>>>> following
>>>>>>>>>>> three operations: read (with continuation tokens), append (blind 
>>>>>>>>>>> write),
>>>>>>>>>>> clear. It may be moot to discuss how to integrate Samza into the 
>>>>>>>>>>> existing
>>>>>>>>>>> framework and should work towards being a portability based runner. 
>>>>>>>>>>> There
>>>>>>>>>>> are more details here about supporting user state during pipeline
>>>>>>>>>>> execution:
>>>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing,
>>>>>>>>>>> top level docs about portability are
>>>>>>>>>>> https://s.apache.org/beam-runner-api and
>>>>>>>>>>> https://s.apache.org/beam-fn-api.
>>>>>>>>>>>
>>>>>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> At least one API that has been discussed in the past, is to use
>>>>>>>>>>>> Java 8 CompletionStage. e.g.
>>>>>>>>>>>>
>>>>>>>>>>>>  new DoFn<InputT, OutputT>() {
>>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>>>>>>>>>> element, ...) {
>>>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>>>     }
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>> The framework will automatically create the CompletionStage,
>>>>>>>>>>>> and the process method can specify a pipeline of asynchronous 
>>>>>>>>>>>> operations to
>>>>>>>>>>>> perform on the element. When all of them are done, the element 
>>>>>>>>>>>> will be
>>>>>>>>>>>> marked as successfully processed.
>>>>>>>>>>>>
>>>>>>>>>>>> Reuven
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <
>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for all the pointers. I looked though the discussion
>>>>>>>>>>>>> over BEAM-2975 and BEAM-2980 about having snapshot or live
>>>>>>>>>>>>> views of iterable, and the current semantics makes a lot of sense 
>>>>>>>>>>>>> to me.
>>>>>>>>>>>>> For your question: it does not require an explicit snapshot when 
>>>>>>>>>>>>> we create
>>>>>>>>>>>>> RocksDb iterator directly. The iterator will read from an 
>>>>>>>>>>>>> implicit snapshot
>>>>>>>>>>>>> as of the time the iterator is created [1], and the snapshot will 
>>>>>>>>>>>>> be
>>>>>>>>>>>>> released after the iterator is closed. If we can have another 
>>>>>>>>>>>>> method to
>>>>>>>>>>>>> return ReadableState<Iterator>, we might be able to apply the 
>>>>>>>>>>>>> auto-closing
>>>>>>>>>>>>> approaches as we discussed and solve the problem here :).
>>>>>>>>>>>>>
>>>>>>>>>>>>> It's very interesting that you bring up the discussion about
>>>>>>>>>>>>> async API! Async IO has been widely adopted here among our users: 
>>>>>>>>>>>>> they use
>>>>>>>>>>>>> netty for async calls with library named ParSeq [2] to help 
>>>>>>>>>>>>> manage the
>>>>>>>>>>>>> calls. Samza provides a primitive callback style API [3], in 
>>>>>>>>>>>>> which the user
>>>>>>>>>>>>> will invoke the callback after the remote calls are complete. 
>>>>>>>>>>>>> Currently in
>>>>>>>>>>>>> a Samza job our users use this API with the ParSeq lib for remote 
>>>>>>>>>>>>> IO. Seems
>>>>>>>>>>>>> we might have to do blocking calls (thus the poor resource 
>>>>>>>>>>>>> utilization you
>>>>>>>>>>>>> mentioned) when using Beam API for now. It'll be great if you can 
>>>>>>>>>>>>> send a
>>>>>>>>>>>>> few more details about the discussion about async API. I would 
>>>>>>>>>>>>> like to add
>>>>>>>>>>>>> our use case and help move this forward.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator
>>>>>>>>>>>>> [2]: https://github.com/linkedin/parseq
>>>>>>>>>>>>> [3]:
>>>>>>>>>>>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <
>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I don't have any further suggestions, but want to call out
>>>>>>>>>>>>>> how this hits a lot of interesting points.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975 [1]
>>>>>>>>>>>>>> and BEAM-2980 [2] where we debated things a bit. I think the 
>>>>>>>>>>>>>> strongest case
>>>>>>>>>>>>>> is for what you describe - it should be a snapshot. Perhaps they 
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>> both be closed as fixed...
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> And you also bring up long blocking calls - we have also
>>>>>>>>>>>>>> deliberately decided that long synchronous blocking calls in
>>>>>>>>>>>>>> @ProcessElement can be embraced for simple programming and 
>>>>>>>>>>>>>> compensated with
>>>>>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing poor
>>>>>>>>>>>>>> utilization). The alternative is a more future-istic API where 
>>>>>>>>>>>>>> the calls
>>>>>>>>>>>>>> can be explicitly asynchronous. We've had some interesting dev@
>>>>>>>>>>>>>> list discussions about that, too.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is another possibility to perhaps have read() return a
>>>>>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have two 
>>>>>>>>>>>>>> methods with
>>>>>>>>>>>>>> different names, one for iterator one for snapshot iterable. But 
>>>>>>>>>>>>>> wouldn't
>>>>>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native RocksDb 
>>>>>>>>>>>>>> iterator
>>>>>>>>>>>>>> require a snapshot to have well-defined contents? As you can 
>>>>>>>>>>>>>> tell, I don't
>>>>>>>>>>>>>> know enough about RocksDb details to be sure of my suggestions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980
>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <
>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted
>>>>>>>>>>>>>>> the question here, we discussed internally about releasing the 
>>>>>>>>>>>>>>> underlying
>>>>>>>>>>>>>>> resources after consuming the whole iterator. This probably 
>>>>>>>>>>>>>>> covers quite a
>>>>>>>>>>>>>>> lot of use cases. For some special cases that the user only 
>>>>>>>>>>>>>>> consume part of
>>>>>>>>>>>>>>> the iterator, Luke and Kenn's suggestion about releasing after
>>>>>>>>>>>>>>> processElement() might work (I need to confirm about this with 
>>>>>>>>>>>>>>> our use
>>>>>>>>>>>>>>> cases). So based on what we discussed so far, we might have a 
>>>>>>>>>>>>>>> good way to
>>>>>>>>>>>>>>> automatically close an iterator for the store.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There is another issue though: right now the state API
>>>>>>>>>>>>>>> returns an iterable for entries(), keys() and values(), and we 
>>>>>>>>>>>>>>> can create
>>>>>>>>>>>>>>> iterator from it. From my understanding, the iterable holds a 
>>>>>>>>>>>>>>> snapshot of
>>>>>>>>>>>>>>> the underlying store. In case of rocksDb, it's going to be a 
>>>>>>>>>>>>>>> db.snapshot().
>>>>>>>>>>>>>>> Then when can we release the snapshot? It's not like iterator 
>>>>>>>>>>>>>>> where we can
>>>>>>>>>>>>>>> use some heuristics to automatically release it. The user can 
>>>>>>>>>>>>>>> hold on to
>>>>>>>>>>>>>>> the iterable and create iterators throughout the whole 
>>>>>>>>>>>>>>> processElement().
>>>>>>>>>>>>>>> But if we only close the iterable after processElement(), I am 
>>>>>>>>>>>>>>> quite
>>>>>>>>>>>>>>> concerned about the limitations this will bring. If the user is 
>>>>>>>>>>>>>>> doing some
>>>>>>>>>>>>>>> remote call during the process, then the snapshot might be held 
>>>>>>>>>>>>>>> for a long
>>>>>>>>>>>>>>> time before releasing, and might cause performance problems. 
>>>>>>>>>>>>>>> And if the
>>>>>>>>>>>>>>> user happen to create multiple iterables, then there will be 
>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>> snapshots loaded during process. Luke suggested being 
>>>>>>>>>>>>>>> aggressive at closing
>>>>>>>>>>>>>>> the resources and recreating when needed again. But in this 
>>>>>>>>>>>>>>> case it might
>>>>>>>>>>>>>>> not work since we won't be able to recreate the same snapshot 
>>>>>>>>>>>>>>> given the
>>>>>>>>>>>>>>> store might have been updated (and creating rocksDb snapshot is 
>>>>>>>>>>>>>>> not cheap
>>>>>>>>>>>>>>> too). I am running out of ideas other than exposing the 
>>>>>>>>>>>>>>> iterator itself
>>>>>>>>>>>>>>> somehow (and add close() if needed?). Any further suggestions?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier
>>>>>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it in 
>>>>>>>>>>>>>>> some sort of
>>>>>>>>>>>>>>> StateResource in the fear that people might reject the proposal 
>>>>>>>>>>>>>>> immediately
>>>>>>>>>>>>>>> after seeing the close() on the iterator. I guess our users are 
>>>>>>>>>>>>>>> familiar
>>>>>>>>>>>>>>> with rocksDb state, it's pretty normal to close the 
>>>>>>>>>>>>>>> iterator/snapshot after
>>>>>>>>>>>>>>> using it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <
>>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The iterator going out of scope is the idiomatic way that
>>>>>>>>>>>>>>>> resources are freed for Java developers (hence the 
>>>>>>>>>>>>>>>> weak/phantom reference
>>>>>>>>>>>>>>>> suggestion).
>>>>>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like
>>>>>>>>>>>>>>>> file streams) lead to leaked resources.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <
>>>>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks Xinyu,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I actually had first sketched out just what you wrote. But
>>>>>>>>>>>>>>>>> then I realized a few things:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>  - usually an Iterable does not allocate resources, only
>>>>>>>>>>>>>>>>> its Iterators
>>>>>>>>>>>>>>>>>  - if you consume the whole iterator, I hope the user
>>>>>>>>>>>>>>>>> would not have to do any extra work
>>>>>>>>>>>>>>>>>  - you can also automatically free it up at the end of the
>>>>>>>>>>>>>>>>> call to @ProcessElement so that is easy too (but you might 
>>>>>>>>>>>>>>>>> not want to)
>>>>>>>>>>>>>>>>>  - so the main use is when the iterator terminates early
>>>>>>>>>>>>>>>>> and is not fully consumed and you can't wait to finish the 
>>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>  - the scoping in Java will force a bunch of uninitialized
>>>>>>>>>>>>>>>>> declarations outside the try-with-resources block, kind of a 
>>>>>>>>>>>>>>>>> lot of
>>>>>>>>>>>>>>>>> boilerplate LOC
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> One thing that is good about your proposal is that the
>>>>>>>>>>>>>>>>> iterable could have some transparent caches that all free up 
>>>>>>>>>>>>>>>>> together.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <
>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for drafting the details about the two approaches,
>>>>>>>>>>>>>>>>>> Kenn. Now I understand Luke's proposal better. The approach 
>>>>>>>>>>>>>>>>>> looks neat, but
>>>>>>>>>>>>>>>>>> the uncertainty of *when* GC is going to kick in will
>>>>>>>>>>>>>>>>>> make users' life hard. If the user happens to configure a 
>>>>>>>>>>>>>>>>>> large JVM heap
>>>>>>>>>>>>>>>>>> size, and since rocksDb uses off-heap memory, GC might start 
>>>>>>>>>>>>>>>>>> very late and
>>>>>>>>>>>>>>>>>> less frequent than what we want. If we don't have a
>>>>>>>>>>>>>>>>>> *definitive* way to let user close the underlying
>>>>>>>>>>>>>>>>>> resources, then there is no good way to handle such failures 
>>>>>>>>>>>>>>>>>> of a critical
>>>>>>>>>>>>>>>>>> application in production.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Having a close method in the iterator might be a little
>>>>>>>>>>>>>>>>>> unorthodox :). To some degree, this is actually a resource 
>>>>>>>>>>>>>>>>>> we are holding
>>>>>>>>>>>>>>>>>> underneath, and I think it's pretty common to have close() 
>>>>>>>>>>>>>>>>>> for a resource
>>>>>>>>>>>>>>>>>> in Java, e.g. BufferedReader and BufferedWriter. So I would 
>>>>>>>>>>>>>>>>>> imagine that we
>>>>>>>>>>>>>>>>>> also define a resource for the state iterator and make the 
>>>>>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>>>> implements *AutoCloseable*. Here is my sketch:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> // StateResource MUST be closed after use.
>>>>>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st =
>>>>>>>>>>>>>>>>>> bagState.iteratorResource()) {
>>>>>>>>>>>>>>>>>>     Iterator<SomeType> iter = st.iterator();
>>>>>>>>>>>>>>>>>>     while (iter.hasNext() {
>>>>>>>>>>>>>>>>>>        .. do stuff ...
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>> } catch (Exception e) {
>>>>>>>>>>>>>>>>>>     ... user code
>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The type/method name are just for illustrating here, so
>>>>>>>>>>>>>>>>>> please don't laugh at them. Please feel free to comment and 
>>>>>>>>>>>>>>>>>> let me know if
>>>>>>>>>>>>>>>>>> you have thoughts about the programming patterns here.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <
>>>>>>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or
>>>>>>>>>>>>>>>>>>> not. There has been no specific API proposed.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I think the problem statement is real - you need to be
>>>>>>>>>>>>>>>>>>> able to read and write bigger-than-memory state. It seems 
>>>>>>>>>>>>>>>>>>> we have multiple
>>>>>>>>>>>>>>>>>>> runners that don't support it, perhaps because of our API. 
>>>>>>>>>>>>>>>>>>> You might be
>>>>>>>>>>>>>>>>>>> able to build something good enough with phantom 
>>>>>>>>>>>>>>>>>>> references, but you might
>>>>>>>>>>>>>>>>>>> not.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If I understand the idea, it might look something like
>>>>>>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>        @StateId("foo")
>>>>>>>>>>>>>>>>>>>        private final StateSpec<BagState<Whatever>>
>>>>>>>>>>>>>>>>>>> myBagSpec = ...
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>        @ProcessElement
>>>>>>>>>>>>>>>>>>>        public void proc(@StateId("foo")
>>>>>>>>>>>>>>>>>>> BagState<Whatever> myBag, ...) {
>>>>>>>>>>>>>>>>>>>          CloseableIterator<Whatever> iterator =
>>>>>>>>>>>>>>>>>>> myBag.get().iterator();
>>>>>>>>>>>>>>>>>>>          while(iterator.hasNext() && ... special
>>>>>>>>>>>>>>>>>>> condition ...) {
>>>>>>>>>>>>>>>>>>>            ... do stuff ...
>>>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>>>          iterator.close();
>>>>>>>>>>>>>>>>>>>        }
>>>>>>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> So it has no impact on users who don't choose to close()
>>>>>>>>>>>>>>>>>>> since they iterate with for ( : ) as usual. And a runner 
>>>>>>>>>>>>>>>>>>> that has the 10x
>>>>>>>>>>>>>>>>>>> funding to try out a ReferenceQueue can be resilient to 
>>>>>>>>>>>>>>>>>>> users that forget.
>>>>>>>>>>>>>>>>>>> On the other hand, I haven't seen this pattern much in the 
>>>>>>>>>>>>>>>>>>> wild, so I think
>>>>>>>>>>>>>>>>>>> it is valuable to discuss other methods.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> While Luke's proposal is something like this if I
>>>>>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with 
>>>>>>>>>>>>>>>>>>> PhantomReference seems
>>>>>>>>>>>>>>>>>>> to be what you really want):
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     ... in RocksDb state implementation ...
>>>>>>>>>>>>>>>>>>>     class RocksDbBagState {
>>>>>>>>>>>>>>>>>>>       static ReferenceQueue rocksDbIteratorQueue = new
>>>>>>>>>>>>>>>>>>> ReferenceQueue();
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>       class Iterator {
>>>>>>>>>>>>>>>>>>>          PhantomReference<RocksDbJniIterator> cIter;
>>>>>>>>>>>>>>>>>>>          .next() {
>>>>>>>>>>>>>>>>>>>            return cIter.next();
>>>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>      class Iterable {
>>>>>>>>>>>>>>>>>>>         .iterator() {
>>>>>>>>>>>>>>>>>>>           return new Iterator(new
>>>>>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, 
>>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue));
>>>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     ... on another thread ...
>>>>>>>>>>>>>>>>>>>     while(true) {
>>>>>>>>>>>>>>>>>>>       RocksDbIterator deadRef = (RocksDbIterator)
>>>>>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove();
>>>>>>>>>>>>>>>>>>>       deadRef.close();
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference will
>>>>>>>>>>>>>>>>>>> pop onto the queue for being closed. This might not be too 
>>>>>>>>>>>>>>>>>>> bad. You'll have
>>>>>>>>>>>>>>>>>>> delayed resource release, and potentially masked errors 
>>>>>>>>>>>>>>>>>>> that are hard to
>>>>>>>>>>>>>>>>>>> debug. It is less error-prone than WeakReference, which is 
>>>>>>>>>>>>>>>>>>> asking for
>>>>>>>>>>>>>>>>>>> trouble when objects are collected en masse. Anecdotally I 
>>>>>>>>>>>>>>>>>>> have heard that
>>>>>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I haven't 
>>>>>>>>>>>>>>>>>>> experienced it
>>>>>>>>>>>>>>>>>>> myself and I can't find good data.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> If I understand correctly, using weak references will
>>>>>>>>>>>>>>>>>>>> help clean up the Java objects once GC kicks in. In case 
>>>>>>>>>>>>>>>>>>>> of kv-store likes
>>>>>>>>>>>>>>>>>>>> rocksDb, the Java iterator is just a JNI interface to the 
>>>>>>>>>>>>>>>>>>>> underlying C
>>>>>>>>>>>>>>>>>>>> iterator, so we need to explicitly invoke close to release 
>>>>>>>>>>>>>>>>>>>> the in-memory
>>>>>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated quickly 
>>>>>>>>>>>>>>>>>>>> if it's not
>>>>>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something as 
>>>>>>>>>>>>>>>>>>>> you suggested
>>>>>>>>>>>>>>>>>>>> here, but looks to me using weak references might not help 
>>>>>>>>>>>>>>>>>>>> in this case.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I understand your concern, and I think you might
>>>>>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working hard 
>>>>>>>>>>>>>>>>>>>> for best user
>>>>>>>>>>>>>>>>>>>> experience, and I think the current API provides a good 
>>>>>>>>>>>>>>>>>>>> example of that.
>>>>>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner here. I 
>>>>>>>>>>>>>>>>>>>> am just proposing
>>>>>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed when 
>>>>>>>>>>>>>>>>>>>> not needed, that
>>>>>>>>>>>>>>>>>>>> way the users can use this feature to iterate through 
>>>>>>>>>>>>>>>>>>>> large state that
>>>>>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a pretty 
>>>>>>>>>>>>>>>>>>>> general use case
>>>>>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually 
>>>>>>>>>>>>>>>>>>>> arguing this will be
>>>>>>>>>>>>>>>>>>>> a better user experience to add this extra API since more 
>>>>>>>>>>>>>>>>>>>> users can benefit
>>>>>>>>>>>>>>>>>>>> from it.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the
>>>>>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by using 
>>>>>>>>>>>>>>>>>>>>> weak references and
>>>>>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few people 
>>>>>>>>>>>>>>>>>>>>> work 10x as hard
>>>>>>>>>>>>>>>>>>>>> to provide a good implementation is much better then 
>>>>>>>>>>>>>>>>>>>>> having 100s or 1000s
>>>>>>>>>>>>>>>>>>>>> of users suffering through a more complicated API.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory
>>>>>>>>>>>>>>>>>>>>>> footprint, but we still won't be able to release the 
>>>>>>>>>>>>>>>>>>>>>> underlying resources.
>>>>>>>>>>>>>>>>>>>>>> We can add definitely heuristics to help release the 
>>>>>>>>>>>>>>>>>>>>>> resources as you
>>>>>>>>>>>>>>>>>>>>>> mentioned, but there is no accurate way to track all the
>>>>>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once not 
>>>>>>>>>>>>>>>>>>>>>> needed. I think while
>>>>>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should have 
>>>>>>>>>>>>>>>>>>>>>> the option to let
>>>>>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. Do 
>>>>>>>>>>>>>>>>>>>>>> you agree?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources and
>>>>>>>>>>>>>>>>>>>>>>> forcing them to will make the user experience worse.
>>>>>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file format
>>>>>>>>>>>>>>>>>>>>>>> which allows random access and use a cache to 
>>>>>>>>>>>>>>>>>>>>>>> load/evict blocks of the
>>>>>>>>>>>>>>>>>>>>>>> state from memory.
>>>>>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which frees
>>>>>>>>>>>>>>>>>>>>>>> the resource after a certain amount of inactivity or 
>>>>>>>>>>>>>>>>>>>>>>> uses weak references.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi, folks,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and
>>>>>>>>>>>>>>>>>>>>>>>> SetState in our Samza runner. We noticed that the 
>>>>>>>>>>>>>>>>>>>>>>>> state returns the Java
>>>>>>>>>>>>>>>>>>>>>>>> Iterable for reading entries, keys, etc. For state 
>>>>>>>>>>>>>>>>>>>>>>>> backed by file-based kv
>>>>>>>>>>>>>>>>>>>>>>>> store like rocksDb, we need to be able to let users 
>>>>>>>>>>>>>>>>>>>>>>>> explicitly close
>>>>>>>>>>>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise 
>>>>>>>>>>>>>>>>>>>>>>>> we have to load the
>>>>>>>>>>>>>>>>>>>>>>>> iterable into memory so we can safely close the 
>>>>>>>>>>>>>>>>>>>>>>>> underlying rocksDb
>>>>>>>>>>>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But
>>>>>>>>>>>>>>>>>>>>>>>> this won't work for states that don't fit into memory. 
>>>>>>>>>>>>>>>>>>>>>>>> I chatted with Kenn
>>>>>>>>>>>>>>>>>>>>>>>> and he also agrees we need this capability to avoid 
>>>>>>>>>>>>>>>>>>>>>>>> bulk read/write. This
>>>>>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we 
>>>>>>>>>>>>>>>>>>>>>>>> can add the support
>>>>>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any 
>>>>>>>>>>>>>>>>>>>>>>>> feedback is highly
>>>>>>>>>>>>>>>>>>>>>>>> appreciated.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>
>>>>>

Reply via email to