The iterator going out of scope is the idiomatic way that resources are
freed for Java developers (hence the weak/phantom reference suggestion).
Explicitly requiring users to deal with 'handles' (like file streams) lead
to leaked resources.

On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <k...@google.com> wrote:

> Thanks Xinyu,
>
> I actually had first sketched out just what you wrote. But then I realized
> a few things:
>
>  - usually an Iterable does not allocate resources, only its Iterators
>  - if you consume the whole iterator, I hope the user would not have to do
> any extra work
>  - you can also automatically free it up at the end of the call to
> @ProcessElement so that is easy too (but you might not want to)
>  - so the main use is when the iterator terminates early and is not fully
> consumed and you can't wait to finish the method
>  - the scoping in Java will force a bunch of uninitialized declarations
> outside the try-with-resources block, kind of a lot of boilerplate LOC
>
> One thing that is good about your proposal is that the iterable could have
> some transparent caches that all free up together.
>
> Kenn
>
> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>
>> Thanks for drafting the details about the two approaches, Kenn. Now I
>> understand Luke's proposal better. The approach looks neat, but the
>> uncertainty of *when* GC is going to kick in will make users' life hard.
>> If the user happens to configure a large JVM heap size, and since rocksDb
>> uses off-heap memory, GC might start very late and less frequent than what
>> we want. If we don't have a *definitive* way to let user close the
>> underlying resources, then there is no good way to handle such failures of
>> a critical application in production.
>>
>> Having a close method in the iterator might be a little unorthodox :). To
>> some degree, this is actually a resource we are holding underneath, and I
>> think it's pretty common to have close() for a resource in Java, e.g.
>> BufferedReader and BufferedWriter. So I would imagine that we also define a
>> resource for the state iterator and make the interface implements
>> *AutoCloseable*. Here is my sketch:
>>
>> // StateResource MUST be closed after use.
>> try (StateResource<Iterator<SomeType>>> st = bagState.iteratorResource())
>> {
>>     Iterator<SomeType> iter = st.iterator();
>>     while (iter.hasNext() {
>>        .. do stuff ...
>>     }
>> } catch (Exception e) {
>>     ... user code
>> }
>>
>> The type/method name are just for illustrating here, so please don't
>> laugh at them. Please feel free to comment and let me know if you have
>> thoughts about the programming patterns here.
>>
>> Thanks,
>> Xinyu
>>
>> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <k...@google.com> wrote:
>>
>>> It is too soon to argue whether an API is complex or not. There has been
>>> no specific API proposed.
>>>
>>> I think the problem statement is real - you need to be able to read and
>>> write bigger-than-memory state. It seems we have multiple runners that
>>> don't support it, perhaps because of our API. You might be able to build
>>> something good enough with phantom references, but you might not.
>>>
>>> If I understand the idea, it might look something like this:
>>>
>>>     new DoFn<>() {
>>>        @StateId("foo")
>>>        private final StateSpec<BagState<Whatever>> myBagSpec = ...
>>>
>>>        @ProcessElement
>>>        public void proc(@StateId("foo") BagState<Whatever> myBag, ...) {
>>>          CloseableIterator<Whatever> iterator = myBag.get().iterator();
>>>          while(iterator.hasNext() && ... special condition ...) {
>>>            ... do stuff ...
>>>          }
>>>          iterator.close();
>>>        }
>>>      }
>>>
>>> So it has no impact on users who don't choose to close() since they
>>> iterate with for ( : ) as usual. And a runner that has the 10x funding to
>>> try out a ReferenceQueue can be resilient to users that forget. On the
>>> other hand, I haven't seen this pattern much in the wild, so I think it is
>>> valuable to discuss other methods.
>>>
>>> While Luke's proposal is something like this if I understand his sketch
>>> (replacing WeakReference with PhantomReference seems to be what you really
>>> want):
>>>
>>>     ... in RocksDb state implementation ...
>>>     class RocksDbBagState {
>>>       static ReferenceQueue rocksDbIteratorQueue = new ReferenceQueue();
>>>
>>>       class Iterator {
>>>          PhantomReference<RocksDbJniIterator> cIter;
>>>          .next() {
>>>            return cIter.next();
>>>          }
>>>       }
>>>
>>>      class Iterable {
>>>         .iterator() {
>>>           return new Iterator(new PhantomReference<>(rocksDbJniIterator,
>>> rocksDbIteratorQueue));
>>>         }
>>>       }
>>>     }
>>>
>>>     ... on another thread ...
>>>     while(true) {
>>>       RocksDbIterator deadRef = (RocksDbIterator)
>>> rocksDbIteratorQueue.remove();
>>>       deadRef.close();
>>>     }
>>>
>>> When the iterator is GC'd, the phantom reference will pop onto the queue
>>> for being closed. This might not be too bad. You'll have delayed resource
>>> release, and potentially masked errors that are hard to debug. It is less
>>> error-prone than WeakReference, which is asking for trouble when objects
>>> are collected en masse. Anecdotally I have heard that performance of this
>>> kind of approach is poor, but I haven't experienced it myself and I can't
>>> find good data.
>>>
>>> Kenn
>>>
>>>
>>>
>>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>>
>>>> If I understand correctly, using weak references will help clean up the
>>>> Java objects once GC kicks in. In case of kv-store likes rocksDb, the Java
>>>> iterator is just a JNI interface to the underlying C iterator, so we need
>>>> to explicitly invoke close to release the in-memory snapshot data, which
>>>> can be large and accumulated quickly if it's not released when not in use.
>>>> Maybe I am missing something as you suggested here, but looks to me using
>>>> weak references might not help in this case.
>>>>
>>>> I understand your concern, and I think you might misunderstood what I
>>>> meant. I am totally for working hard for best user experience, and I think
>>>> the current API provides a good example of that. That's also the reason I
>>>> am implementing a runner here. I am just proposing an extra API to expose
>>>> an iterator that can be closed when not needed, that way the users can use
>>>> this feature to iterate through large state that doesn't fit into memory. I
>>>> believe this is also a pretty general use case and it's better to have
>>>> support for it. I am actually arguing this will be a better user experience
>>>> to add this extra API since more users can benefit from it.
>>>>
>>>> Thanks,
>>>> Xinyu
>>>>
>>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> I don't agree. I believe you can track the iterators/iterables that
>>>>> are created and freed by using weak references and reference queues (or
>>>>> other methods). Having a few people work 10x as hard to provide a good
>>>>> implementation is much better then having 100s or 1000s of users suffering
>>>>> through a more complicated API.
>>>>>
>>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Load/evict blocks will help reduce the cache memory footprint, but we
>>>>>> still won't be able to release the underlying resources. We can add
>>>>>> definitely heuristics to help release the resources as you mentioned, but
>>>>>> there is no accurate way to track all the iterators/iterables created and
>>>>>> free them up once not needed. I think while the API is aimed at nice user
>>>>>> experience, we should have the option to let users optimize their
>>>>>> performance if they choose to. Do you agree?
>>>>>>
>>>>>> Thanks,
>>>>>> Xinyu
>>>>>>
>>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Users won't reliably close/release the resources and forcing them to
>>>>>>> will make the user experience worse.
>>>>>>> It will make a lot more sense to use a file format which allows
>>>>>>> random access and use a cache to load/evict blocks of the state from 
>>>>>>> memory.
>>>>>>> If that is not possible, use an iterable which frees the resource
>>>>>>> after a certain amount of inactivity or uses weak references.
>>>>>>>
>>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi, folks,
>>>>>>>>
>>>>>>>> I'm in the middle of implementing the MapState and SetState in our
>>>>>>>> Samza runner. We noticed that the state returns the Java Iterable for
>>>>>>>> reading entries, keys, etc. For state backed by file-based kv store 
>>>>>>>> like
>>>>>>>> rocksDb, we need to be able to let users explicitly close 
>>>>>>>> iterator/iterable
>>>>>>>> to release the resources.Otherwise we have to load the iterable into 
>>>>>>>> memory
>>>>>>>> so we can safely close the underlying rocksDb iterator, similar to 
>>>>>>>> Flink's
>>>>>>>> implementation. But this won't work for states that don't fit into
>>>>>>>> memory. I chatted with Kenn and he also agrees we need this capability 
>>>>>>>> to
>>>>>>>> avoid bulk read/write. This seems to be a general use case and I'm
>>>>>>>> wondering if we can add the support to it? I am happy to contribute to 
>>>>>>>> this
>>>>>>>> if needed. Any feedback is highly appreciated.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Xinyu
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>

Reply via email to