I feel like this discussion is kind of far from the primary intention. The
point of ParDo(stateful DoFn) is to enable naive single-threaded code in a
style intuitive to a beginning imperative programmer. So:

 - the return value of read() should act like an immutable value
 - if there is a read after a write, that read should reflect the changes
written, because there's a trivial happens-before relationship induced by
program order
 - there are no non-trivial happens-before relationships
 - a write after a read should not affect the value read before

>From that starting point, the limitations on expressiveness (single
threaded per-key-and-window-and-step) give rise to embarrassingly parallel
computation and GC.

So when you say "portability semantics" it makes me concerned. The word
"semantics" refers to the mapping between what a user writes and what it
means. Primarily, that means the conceptual translation between a primitive
PTransform and the corresponding PCollection-to-PCollection operation. It
is true that portability complicates things because a user's code can only
be given meaning relative to an SDK harness. But the end-user intention is
unchanged. If we had time and resources to give the Fn API a solid spec, it
should be such that the SDK harness has little choice but to implement the
primitives as intended.

In other words, it is the job of
https://s.apache.org/beam-fn-state-api-and-bundle-processing* to implement
https://s.apache.org/beam-state. The discussion of adding
`ReadableState<Iterator> readIterator()` to BagState seems consistent with
the latter.

Kenn

*IIUC you've chosen to use the same underlying proto service for side
inputs and by-reference values. That's an implementation detail that I have
no particular opinion about except that if it complicates implementing the
primary motivator for state, it may not be a great fit.



On Mon, May 14, 2018 at 7:35 PM Kenneth Knowles <k...@google.com> wrote:

> I don't follow why allowing freeing resources would be counter to the
> spec. I don't really know what you mean by consistent for a bundle. State,
> in the sense of the user-facing per-key-and-window state API, is single
> threaded and scoped to a single DoFn. There's no one else who can write the
> state. If a BagState is read and written and read again, the user-facing
> logic should be unaware of the resources and not have any logic to deal
> with consistency.
>
> Kenn
>
> On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Hmm, some of the problem I'm dealing with is:
>> * I don't understand how you would want close to change the semantics of
>> a user state specification and how it affects the lifetime of user state?
>> ** Does it represent committing information within a bundle?
>> ** Does it mean that user state can ignore the replayable and consistent
>> semantics for a lifetime of a bundle semantics?
>> * I do understand that close semantics may make it easier for Samza
>> runner to support state that is greater then memory, but what does it
>> provide to users and how would they benefit (like what new scenarios would
>> it support)?
>> * I don't understand in RocksDb why you need a snapshot in the first
>> place, and whether RocksDb can support seeking inside or outside a snapshot
>> relatively efficiently?
>>
>> There seems to be some context lost initially with a discussion that you
>> and Kenn had, is there a copy of that which could be shared? May help me
>> get up to speed on to the problem that is being solved.
>>
>>
>> On Mon, May 14, 2018 at 5:11 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>
>>> I will take a look at the docs to understand the problem better. A minor
>>> comment to 2) is that I don't intend to change the existing iterable API. I
>>> plan to implement it similar to Flink, loading the data into memory and
>>> closing the underlying snapshot after that. So the changes should be
>>> backward compatible.
>>>
>>> Thanks,
>>> Xinyu
>>>
>>>
>>>
>>> On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> I believe adding support for a state spec to be 'closed' or 'freed' is
>>>> counter to the requirement of a state spec being consistent for the
>>>> lifetime of a bundle, are we willing to change this requirement for the
>>>> lifetime of a bundle or say that runners can arbitrary say that a StateSpec
>>>> can't be accessed anymore?
>>>>
>>>> If not, I'm having trouble of thinking of a good way on how to
>>>> integrate the 'close/free' API with portability semantics because:
>>>> 1) Runners do control bundle size but other then the trivial cases
>>>> where a runner specifically chooses:
>>>>   a) to only process a single element at a time (negating the need for
>>>> a free/close() method).
>>>>   b) optimistically process elements and if you run out of memory kill
>>>> the bundle and try again with a smaller bundle.
>>>> 2) Adding an API to provide Iterators doesn't mean that users can't
>>>> still use iterable which can't be force closed. (Dropping Iterable in
>>>> exchange for Iterator in the API is backwards incompatible so likely to be
>>>> deferred till the next major version of Apache Beam).
>>>> 3) SDKs are able to process as many elements as they want in parallel,
>>>> nothing requires them to execute elements serially throughout the pipeline
>>>> graph.
>>>> 4) The runner has limited information into what an SDK is doing. The
>>>> SDK provides a lower bound on how many elements it has processed but SDKs
>>>> aren't required to implement this meaning that they could technically be
>>>> processing everything in random order after they have seen all input for a
>>>> bundle.
>>>>
>>>> We'll need to work through the scenarios, Xinyu it would be useful for
>>>> you to take a look at these two docs for context into the problem space:
>>>> https://s.apache.org/beam-fn-api-processing-a-bundle (How to process a
>>>> bundle)
>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing (How to
>>>> access side inputs, access remote references, and support user state)
>>>>
>>>>
>>>> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <k...@google.com> wrote:
>>>>
>>>>>
>>>>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Before you go on and update the user facing API, we should discuss
>>>>>> the last point I made since the change your making will have limited
>>>>>> usability since the portability effort won't realistically allow you to 
>>>>>> see
>>>>>> such low level things like when processElement finished and supporting 
>>>>>> user
>>>>>> state will be modeled using the following three operations: read (with
>>>>>> continuation tokens), append (blind write), clear. It may be moot to
>>>>>> discuss how to integrate Samza into the existing framework and should 
>>>>>> work
>>>>>> towards being a portability based runner. There are more details here 
>>>>>> about
>>>>>> supporting user state during pipeline execution:
>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top
>>>>>> level docs about portability are https://s.apache.org/beam-runner-api
>>>>>> and https://s.apache.org/beam-fn-api.
>>>>>>
>>>>>> Do we really want to make a change to the user facing API for user
>>>>>> state in the Java SDK when that code path will be phased out in exchange
>>>>>> for using the portability APIs?
>>>>>>
>>>>>
>>>>> To be clear, the user API will remain the same, but the runner's
>>>>> implementation path will be phased out. So should we expand this 
>>>>> discussion
>>>>> to how the portability APIs enable the SDK and runner to collaborate to
>>>>> achieve this use case? It seems like the interaction you need is that the
>>>>> runner can tell that the SDK can closed the connection on the read(), and
>>>>> the SDK needs to do so promptly, right?
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>
>>>>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> We discussed internally about the proposed approaches. Seems if the
>>>>>>> State API can also expose another method to return a 
>>>>>>> ReadableState<Iterator>,
>>>>>>> it will cover our cases of iterating over a bigger-then-memory state, 
>>>>>>> and
>>>>>>> closing the underlying rocksDb snapshot immediately after the iterator 
>>>>>>> is
>>>>>>> fully consumed (most cases), and also safely close the rest of the
>>>>>>> iterators after proessElement (minor cases). If this sounds good to you
>>>>>>> guys, I am going to create a JIRA ticket for it and open a PR for it.
>>>>>>> Thanks a lot for the discussions here.
>>>>>>>
>>>>>>> The discussions about Async processing is very interesting, and I
>>>>>>> think it's definitely worth its own thread. I believe we do need the
>>>>>>> support from the Beam API/model so the users can take advantage of it
>>>>>>> (Besides Samza, Flink also has an async operator that helps a lot in the
>>>>>>> Alibaba's use cases). Yes, it will add complexity to the underlying
>>>>>>> framework, but it's going to be great for the users to do remote IO. In
>>>>>>> practice we found it actually avoids thread+locks issues, as Kenn 
>>>>>>> mentioned
>>>>>>> above. I am not sure whether this feature can be runner-specific support
>>>>>>> thing. I will probably create another email thread for this discussion 
>>>>>>> in
>>>>>>> the future and hopefully I can move this forward.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Xinyu
>>>>>>>
>>>>>>> On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <k...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Users typically want to do that async operation and then produce
>>>>>>>>> output with it. Adding asynchronous execution is difficult within the
>>>>>>>>> framework because a lot of code is currently not needed to be thread 
>>>>>>>>> safe
>>>>>>>>> and writing code to be fast and handle asynchronous execution is quite
>>>>>>>>> difficult. Adding async operations typically leads to code with too 
>>>>>>>>> many
>>>>>>>>> locks/synchronization blocks.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Just a small point here, as it is very unusual to use locks with
>>>>>>>> futures. The essential innovation of futures is that it is a new way to
>>>>>>>> program that avoids threads+locks style. In part, you get this for free
>>>>>>>> from functional programming style, and on the other hand you reify
>>>>>>>> asynchronous side effects as data dependencies.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> Note that with the portability effort, the Runner won't have
>>>>>>>>> visibility into such low level things like when an object is garbage
>>>>>>>>> collected and supporting user state will be modeled using the 
>>>>>>>>> following
>>>>>>>>> three operations: read (with continuation tokens), append (blind 
>>>>>>>>> write),
>>>>>>>>> clear. It may be moot to discuss how to integrate Samza into the 
>>>>>>>>> existing
>>>>>>>>> framework and should work towards being a portability based runner. 
>>>>>>>>> There
>>>>>>>>> are more details here about supporting user state during pipeline
>>>>>>>>> execution:
>>>>>>>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top
>>>>>>>>> level docs about portability are
>>>>>>>>> https://s.apache.org/beam-runner-api and
>>>>>>>>> https://s.apache.org/beam-fn-api.
>>>>>>>>>
>>>>>>>>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> At least one API that has been discussed in the past, is to use
>>>>>>>>>> Java 8 CompletionStage. e.g.
>>>>>>>>>>
>>>>>>>>>>  new DoFn<InputT, OutputT>() {
>>>>>>>>>>     @ProcessElement
>>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>>> ...) {
>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>     }
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> The framework will automatically create the CompletionStage, and
>>>>>>>>>> the process method can specify a pipeline of asynchronous operations 
>>>>>>>>>> to
>>>>>>>>>> perform on the element. When all of them are done, the element will 
>>>>>>>>>> be
>>>>>>>>>> marked as successfully processed.
>>>>>>>>>>
>>>>>>>>>> Reuven
>>>>>>>>>>
>>>>>>>>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for all the pointers. I looked though the discussion over 
>>>>>>>>>>> BEAM-2975
>>>>>>>>>>> and BEAM-2980 about having snapshot or live views of iterable,
>>>>>>>>>>> and the current semantics makes a lot of sense to me.  For your 
>>>>>>>>>>> question:
>>>>>>>>>>> it does not require an explicit snapshot when we create RocksDb 
>>>>>>>>>>> iterator
>>>>>>>>>>> directly. The iterator will read from an implicit snapshot as of 
>>>>>>>>>>> the time
>>>>>>>>>>> the iterator is created [1], and the snapshot will be released 
>>>>>>>>>>> after the
>>>>>>>>>>> iterator is closed. If we can have another method to return
>>>>>>>>>>> ReadableState<Iterator>, we might be able to apply the auto-closing
>>>>>>>>>>> approaches as we discussed and solve the problem here :).
>>>>>>>>>>>
>>>>>>>>>>> It's very interesting that you bring up the discussion about
>>>>>>>>>>> async API! Async IO has been widely adopted here among our users: 
>>>>>>>>>>> they use
>>>>>>>>>>> netty for async calls with library named ParSeq [2] to help manage 
>>>>>>>>>>> the
>>>>>>>>>>> calls. Samza provides a primitive callback style API [3], in which 
>>>>>>>>>>> the user
>>>>>>>>>>> will invoke the callback after the remote calls are complete. 
>>>>>>>>>>> Currently in
>>>>>>>>>>> a Samza job our users use this API with the ParSeq lib for remote 
>>>>>>>>>>> IO. Seems
>>>>>>>>>>> we might have to do blocking calls (thus the poor resource 
>>>>>>>>>>> utilization you
>>>>>>>>>>> mentioned) when using Beam API for now. It'll be great if you can 
>>>>>>>>>>> send a
>>>>>>>>>>> few more details about the discussion about async API. I would like 
>>>>>>>>>>> to add
>>>>>>>>>>> our use case and help move this forward.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Xinyu
>>>>>>>>>>>
>>>>>>>>>>> [1]: https://github.com/facebook/rocksdb/wiki/Iterator
>>>>>>>>>>> [2]: https://github.com/linkedin/parseq
>>>>>>>>>>> [3]:
>>>>>>>>>>> https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <k...@google.com
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I don't have any further suggestions, but want to call out how
>>>>>>>>>>>> this hits a lot of interesting points.
>>>>>>>>>>>>
>>>>>>>>>>>> The point about snapshotting is great. We have BEAM-2975 [1]
>>>>>>>>>>>> and BEAM-2980 [2] where we debated things a bit. I think the 
>>>>>>>>>>>> strongest case
>>>>>>>>>>>> is for what you describe - it should be a snapshot. Perhaps they 
>>>>>>>>>>>> should
>>>>>>>>>>>> both be closed as fixed...
>>>>>>>>>>>>
>>>>>>>>>>>> And you also bring up long blocking calls - we have also
>>>>>>>>>>>> deliberately decided that long synchronous blocking calls in
>>>>>>>>>>>> @ProcessElement can be embraced for simple programming and 
>>>>>>>>>>>> compensated with
>>>>>>>>>>>> autoscaling smarts (e.g. expand the thread pool by noticing poor
>>>>>>>>>>>> utilization). The alternative is a more future-istic API where the 
>>>>>>>>>>>> calls
>>>>>>>>>>>> can be explicitly asynchronous. We've had some interesting dev@
>>>>>>>>>>>> list discussions about that, too.
>>>>>>>>>>>>
>>>>>>>>>>>> Is another possibility to perhaps have read() return a
>>>>>>>>>>>> ReadableState<Iterator> instead? We could, of course, have two 
>>>>>>>>>>>> methods with
>>>>>>>>>>>> different names, one for iterator one for snapshot iterable. But 
>>>>>>>>>>>> wouldn't
>>>>>>>>>>>> the Iterator also require a snapshot? Doesn't a native RocksDb 
>>>>>>>>>>>> iterator
>>>>>>>>>>>> require a snapshot to have well-defined contents? As you can tell, 
>>>>>>>>>>>> I don't
>>>>>>>>>>>> know enough about RocksDb details to be sure of my suggestions.
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-2980
>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-2975
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <
>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted
>>>>>>>>>>>>> the question here, we discussed internally about releasing the 
>>>>>>>>>>>>> underlying
>>>>>>>>>>>>> resources after consuming the whole iterator. This probably 
>>>>>>>>>>>>> covers quite a
>>>>>>>>>>>>> lot of use cases. For some special cases that the user only 
>>>>>>>>>>>>> consume part of
>>>>>>>>>>>>> the iterator, Luke and Kenn's suggestion about releasing after
>>>>>>>>>>>>> processElement() might work (I need to confirm about this with 
>>>>>>>>>>>>> our use
>>>>>>>>>>>>> cases). So based on what we discussed so far, we might have a 
>>>>>>>>>>>>> good way to
>>>>>>>>>>>>> automatically close an iterator for the store.
>>>>>>>>>>>>>
>>>>>>>>>>>>> There is another issue though: right now the state API returns
>>>>>>>>>>>>> an iterable for entries(), keys() and values(), and we can create 
>>>>>>>>>>>>> iterator
>>>>>>>>>>>>> from it. From my understanding, the iterable holds a snapshot of 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> underlying store. In case of rocksDb, it's going to be a 
>>>>>>>>>>>>> db.snapshot().
>>>>>>>>>>>>> Then when can we release the snapshot? It's not like iterator 
>>>>>>>>>>>>> where we can
>>>>>>>>>>>>> use some heuristics to automatically release it. The user can 
>>>>>>>>>>>>> hold on to
>>>>>>>>>>>>> the iterable and create iterators throughout the whole 
>>>>>>>>>>>>> processElement().
>>>>>>>>>>>>> But if we only close the iterable after processElement(), I am 
>>>>>>>>>>>>> quite
>>>>>>>>>>>>> concerned about the limitations this will bring. If the user is 
>>>>>>>>>>>>> doing some
>>>>>>>>>>>>> remote call during the process, then the snapshot might be held 
>>>>>>>>>>>>> for a long
>>>>>>>>>>>>> time before releasing, and might cause performance problems. And 
>>>>>>>>>>>>> if the
>>>>>>>>>>>>> user happen to create multiple iterables, then there will be 
>>>>>>>>>>>>> multiple
>>>>>>>>>>>>> snapshots loaded during process. Luke suggested being aggressive 
>>>>>>>>>>>>> at closing
>>>>>>>>>>>>> the resources and recreating when needed again. But in this case 
>>>>>>>>>>>>> it might
>>>>>>>>>>>>> not work since we won't be able to recreate the same snapshot 
>>>>>>>>>>>>> given the
>>>>>>>>>>>>> store might have been updated (and creating rocksDb snapshot is 
>>>>>>>>>>>>> not cheap
>>>>>>>>>>>>> too). I am running out of ideas other than exposing the iterator 
>>>>>>>>>>>>> itself
>>>>>>>>>>>>> somehow (and add close() if needed?). Any further suggestions?
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Kenn: btw, I have the same impl you posted earlier
>>>>>>>>>>>>> (CloseableIterator) in an internal interface. I wrapped it in 
>>>>>>>>>>>>> some sort of
>>>>>>>>>>>>> StateResource in the fear that people might reject the proposal 
>>>>>>>>>>>>> immediately
>>>>>>>>>>>>> after seeing the close() on the iterator. I guess our users are 
>>>>>>>>>>>>> familiar
>>>>>>>>>>>>> with rocksDb state, it's pretty normal to close the 
>>>>>>>>>>>>> iterator/snapshot after
>>>>>>>>>>>>> using it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <
>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The iterator going out of scope is the idiomatic way that
>>>>>>>>>>>>>> resources are freed for Java developers (hence the weak/phantom 
>>>>>>>>>>>>>> reference
>>>>>>>>>>>>>> suggestion).
>>>>>>>>>>>>>> Explicitly requiring users to deal with 'handles' (like file
>>>>>>>>>>>>>> streams) lead to leaked resources.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <
>>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks Xinyu,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I actually had first sketched out just what you wrote. But
>>>>>>>>>>>>>>> then I realized a few things:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  - usually an Iterable does not allocate resources, only its
>>>>>>>>>>>>>>> Iterators
>>>>>>>>>>>>>>>  - if you consume the whole iterator, I hope the user would
>>>>>>>>>>>>>>> not have to do any extra work
>>>>>>>>>>>>>>>  - you can also automatically free it up at the end of the
>>>>>>>>>>>>>>> call to @ProcessElement so that is easy too (but you might not 
>>>>>>>>>>>>>>> want to)
>>>>>>>>>>>>>>>  - so the main use is when the iterator terminates early and
>>>>>>>>>>>>>>> is not fully consumed and you can't wait to finish the method
>>>>>>>>>>>>>>>  - the scoping in Java will force a bunch of uninitialized
>>>>>>>>>>>>>>> declarations outside the try-with-resources block, kind of a 
>>>>>>>>>>>>>>> lot of
>>>>>>>>>>>>>>> boilerplate LOC
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> One thing that is good about your proposal is that the
>>>>>>>>>>>>>>> iterable could have some transparent caches that all free up 
>>>>>>>>>>>>>>> together.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <
>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for drafting the details about the two approaches,
>>>>>>>>>>>>>>>> Kenn. Now I understand Luke's proposal better. The approach 
>>>>>>>>>>>>>>>> looks neat, but
>>>>>>>>>>>>>>>> the uncertainty of *when* GC is going to kick in will make
>>>>>>>>>>>>>>>> users' life hard. If the user happens to configure a large JVM 
>>>>>>>>>>>>>>>> heap size,
>>>>>>>>>>>>>>>> and since rocksDb uses off-heap memory, GC might start very 
>>>>>>>>>>>>>>>> late and less
>>>>>>>>>>>>>>>> frequent than what we want. If we don't have a *definitive*
>>>>>>>>>>>>>>>> way to let user close the underlying resources, then there is 
>>>>>>>>>>>>>>>> no good way
>>>>>>>>>>>>>>>> to handle such failures of a critical application in 
>>>>>>>>>>>>>>>> production.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Having a close method in the iterator might be a little
>>>>>>>>>>>>>>>> unorthodox :). To some degree, this is actually a resource we 
>>>>>>>>>>>>>>>> are holding
>>>>>>>>>>>>>>>> underneath, and I think it's pretty common to have close() for 
>>>>>>>>>>>>>>>> a resource
>>>>>>>>>>>>>>>> in Java, e.g. BufferedReader and BufferedWriter. So I would 
>>>>>>>>>>>>>>>> imagine that we
>>>>>>>>>>>>>>>> also define a resource for the state iterator and make the 
>>>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>> implements *AutoCloseable*. Here is my sketch:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> // StateResource MUST be closed after use.
>>>>>>>>>>>>>>>> try (StateResource<Iterator<SomeType>>> st =
>>>>>>>>>>>>>>>> bagState.iteratorResource()) {
>>>>>>>>>>>>>>>>     Iterator<SomeType> iter = st.iterator();
>>>>>>>>>>>>>>>>     while (iter.hasNext() {
>>>>>>>>>>>>>>>>        .. do stuff ...
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>> } catch (Exception e) {
>>>>>>>>>>>>>>>>     ... user code
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The type/method name are just for illustrating here, so
>>>>>>>>>>>>>>>> please don't laugh at them. Please feel free to comment and 
>>>>>>>>>>>>>>>> let me know if
>>>>>>>>>>>>>>>> you have thoughts about the programming patterns here.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <
>>>>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It is too soon to argue whether an API is complex or not.
>>>>>>>>>>>>>>>>> There has been no specific API proposed.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think the problem statement is real - you need to be
>>>>>>>>>>>>>>>>> able to read and write bigger-than-memory state. It seems we 
>>>>>>>>>>>>>>>>> have multiple
>>>>>>>>>>>>>>>>> runners that don't support it, perhaps because of our API. 
>>>>>>>>>>>>>>>>> You might be
>>>>>>>>>>>>>>>>> able to build something good enough with phantom references, 
>>>>>>>>>>>>>>>>> but you might
>>>>>>>>>>>>>>>>> not.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If I understand the idea, it might look something like
>>>>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>        @StateId("foo")
>>>>>>>>>>>>>>>>>        private final StateSpec<BagState<Whatever>>
>>>>>>>>>>>>>>>>> myBagSpec = ...
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>        @ProcessElement
>>>>>>>>>>>>>>>>>        public void proc(@StateId("foo") BagState<Whatever>
>>>>>>>>>>>>>>>>> myBag, ...) {
>>>>>>>>>>>>>>>>>          CloseableIterator<Whatever> iterator =
>>>>>>>>>>>>>>>>> myBag.get().iterator();
>>>>>>>>>>>>>>>>>          while(iterator.hasNext() && ... special condition
>>>>>>>>>>>>>>>>> ...) {
>>>>>>>>>>>>>>>>>            ... do stuff ...
>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>          iterator.close();
>>>>>>>>>>>>>>>>>        }
>>>>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So it has no impact on users who don't choose to close()
>>>>>>>>>>>>>>>>> since they iterate with for ( : ) as usual. And a runner that 
>>>>>>>>>>>>>>>>> has the 10x
>>>>>>>>>>>>>>>>> funding to try out a ReferenceQueue can be resilient to users 
>>>>>>>>>>>>>>>>> that forget.
>>>>>>>>>>>>>>>>> On the other hand, I haven't seen this pattern much in the 
>>>>>>>>>>>>>>>>> wild, so I think
>>>>>>>>>>>>>>>>> it is valuable to discuss other methods.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> While Luke's proposal is something like this if I
>>>>>>>>>>>>>>>>> understand his sketch (replacing WeakReference with 
>>>>>>>>>>>>>>>>> PhantomReference seems
>>>>>>>>>>>>>>>>> to be what you really want):
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     ... in RocksDb state implementation ...
>>>>>>>>>>>>>>>>>     class RocksDbBagState {
>>>>>>>>>>>>>>>>>       static ReferenceQueue rocksDbIteratorQueue = new
>>>>>>>>>>>>>>>>> ReferenceQueue();
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>       class Iterator {
>>>>>>>>>>>>>>>>>          PhantomReference<RocksDbJniIterator> cIter;
>>>>>>>>>>>>>>>>>          .next() {
>>>>>>>>>>>>>>>>>            return cIter.next();
>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>      class Iterable {
>>>>>>>>>>>>>>>>>         .iterator() {
>>>>>>>>>>>>>>>>>           return new Iterator(new
>>>>>>>>>>>>>>>>> PhantomReference<>(rocksDbJniIterator, rocksDbIteratorQueue));
>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     ... on another thread ...
>>>>>>>>>>>>>>>>>     while(true) {
>>>>>>>>>>>>>>>>>       RocksDbIterator deadRef = (RocksDbIterator)
>>>>>>>>>>>>>>>>> rocksDbIteratorQueue.remove();
>>>>>>>>>>>>>>>>>       deadRef.close();
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When the iterator is GC'd, the phantom reference will pop
>>>>>>>>>>>>>>>>> onto the queue for being closed. This might not be too bad. 
>>>>>>>>>>>>>>>>> You'll have
>>>>>>>>>>>>>>>>> delayed resource release, and potentially masked errors that 
>>>>>>>>>>>>>>>>> are hard to
>>>>>>>>>>>>>>>>> debug. It is less error-prone than WeakReference, which is 
>>>>>>>>>>>>>>>>> asking for
>>>>>>>>>>>>>>>>> trouble when objects are collected en masse. Anecdotally I 
>>>>>>>>>>>>>>>>> have heard that
>>>>>>>>>>>>>>>>> performance of this kind of approach is poor, but I haven't 
>>>>>>>>>>>>>>>>> experienced it
>>>>>>>>>>>>>>>>> myself and I can't find good data.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <
>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If I understand correctly, using weak references will
>>>>>>>>>>>>>>>>>> help clean up the Java objects once GC kicks in. In case of 
>>>>>>>>>>>>>>>>>> kv-store likes
>>>>>>>>>>>>>>>>>> rocksDb, the Java iterator is just a JNI interface to the 
>>>>>>>>>>>>>>>>>> underlying C
>>>>>>>>>>>>>>>>>> iterator, so we need to explicitly invoke close to release 
>>>>>>>>>>>>>>>>>> the in-memory
>>>>>>>>>>>>>>>>>> snapshot data, which can be large and accumulated quickly if 
>>>>>>>>>>>>>>>>>> it's not
>>>>>>>>>>>>>>>>>> released when not in use. Maybe I am missing something as 
>>>>>>>>>>>>>>>>>> you suggested
>>>>>>>>>>>>>>>>>> here, but looks to me using weak references might not help 
>>>>>>>>>>>>>>>>>> in this case.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I understand your concern, and I think you might
>>>>>>>>>>>>>>>>>> misunderstood what I meant. I am totally for working hard 
>>>>>>>>>>>>>>>>>> for best user
>>>>>>>>>>>>>>>>>> experience, and I think the current API provides a good 
>>>>>>>>>>>>>>>>>> example of that.
>>>>>>>>>>>>>>>>>> That's also the reason I am implementing a runner here. I am 
>>>>>>>>>>>>>>>>>> just proposing
>>>>>>>>>>>>>>>>>> an extra API to expose an iterator that can be closed when 
>>>>>>>>>>>>>>>>>> not needed, that
>>>>>>>>>>>>>>>>>> way the users can use this feature to iterate through large 
>>>>>>>>>>>>>>>>>> state that
>>>>>>>>>>>>>>>>>> doesn't fit into memory. I believe this is also a pretty 
>>>>>>>>>>>>>>>>>> general use case
>>>>>>>>>>>>>>>>>> and it's better to have support for it. I am actually 
>>>>>>>>>>>>>>>>>> arguing this will be
>>>>>>>>>>>>>>>>>> a better user experience to add this extra API since more 
>>>>>>>>>>>>>>>>>> users can benefit
>>>>>>>>>>>>>>>>>> from it.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I don't agree. I believe you can track the
>>>>>>>>>>>>>>>>>>> iterators/iterables that are created and freed by using 
>>>>>>>>>>>>>>>>>>> weak references and
>>>>>>>>>>>>>>>>>>> reference queues (or other methods). Having a few people 
>>>>>>>>>>>>>>>>>>> work 10x as hard
>>>>>>>>>>>>>>>>>>> to provide a good implementation is much better then having 
>>>>>>>>>>>>>>>>>>> 100s or 1000s
>>>>>>>>>>>>>>>>>>> of users suffering through a more complicated API.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Load/evict blocks will help reduce the cache memory
>>>>>>>>>>>>>>>>>>>> footprint, but we still won't be able to release the 
>>>>>>>>>>>>>>>>>>>> underlying resources.
>>>>>>>>>>>>>>>>>>>> We can add definitely heuristics to help release the 
>>>>>>>>>>>>>>>>>>>> resources as you
>>>>>>>>>>>>>>>>>>>> mentioned, but there is no accurate way to track all the
>>>>>>>>>>>>>>>>>>>> iterators/iterables created and free them up once not 
>>>>>>>>>>>>>>>>>>>> needed. I think while
>>>>>>>>>>>>>>>>>>>> the API is aimed at nice user experience, we should have 
>>>>>>>>>>>>>>>>>>>> the option to let
>>>>>>>>>>>>>>>>>>>> users optimize their performance if they choose to. Do you 
>>>>>>>>>>>>>>>>>>>> agree?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <
>>>>>>>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Users won't reliably close/release the resources and
>>>>>>>>>>>>>>>>>>>>> forcing them to will make the user experience worse.
>>>>>>>>>>>>>>>>>>>>> It will make a lot more sense to use a file format
>>>>>>>>>>>>>>>>>>>>> which allows random access and use a cache to load/evict 
>>>>>>>>>>>>>>>>>>>>> blocks of the
>>>>>>>>>>>>>>>>>>>>> state from memory.
>>>>>>>>>>>>>>>>>>>>> If that is not possible, use an iterable which frees
>>>>>>>>>>>>>>>>>>>>> the resource after a certain amount of inactivity or uses 
>>>>>>>>>>>>>>>>>>>>> weak references.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <
>>>>>>>>>>>>>>>>>>>>> xinyuliu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi, folks,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I'm in the middle of implementing the MapState and
>>>>>>>>>>>>>>>>>>>>>> SetState in our Samza runner. We noticed that the state 
>>>>>>>>>>>>>>>>>>>>>> returns the Java
>>>>>>>>>>>>>>>>>>>>>> Iterable for reading entries, keys, etc. For state 
>>>>>>>>>>>>>>>>>>>>>> backed by file-based kv
>>>>>>>>>>>>>>>>>>>>>> store like rocksDb, we need to be able to let users 
>>>>>>>>>>>>>>>>>>>>>> explicitly close
>>>>>>>>>>>>>>>>>>>>>> iterator/iterable to release the resources.Otherwise we 
>>>>>>>>>>>>>>>>>>>>>> have to load the
>>>>>>>>>>>>>>>>>>>>>> iterable into memory so we can safely close the 
>>>>>>>>>>>>>>>>>>>>>> underlying rocksDb
>>>>>>>>>>>>>>>>>>>>>> iterator, similar to Flink's implementation. But
>>>>>>>>>>>>>>>>>>>>>> this won't work for states that don't fit into memory. I 
>>>>>>>>>>>>>>>>>>>>>> chatted with Kenn
>>>>>>>>>>>>>>>>>>>>>> and he also agrees we need this capability to avoid bulk 
>>>>>>>>>>>>>>>>>>>>>> read/write. This
>>>>>>>>>>>>>>>>>>>>>> seems to be a general use case and I'm wondering if we 
>>>>>>>>>>>>>>>>>>>>>> can add the support
>>>>>>>>>>>>>>>>>>>>>> to it? I am happy to contribute to this if needed. Any 
>>>>>>>>>>>>>>>>>>>>>> feedback is highly
>>>>>>>>>>>>>>>>>>>>>> appreciated.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> Xinyu
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>
>>>

Reply via email to