Alternatively to using weak/phantom reference:
* Can you configure RocksDb's memory usage/limits?
* Inside the iterator, periodically close and re-open the RocksDb
connection seeking back to where the user was?
* Use the ParDo/DoFn lifecycle and clean up after each
processElement/finishBundle call.


On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <xinyuliu...@gmail.com> wrote:

> Thanks for drafting the details about the two approaches, Kenn. Now I
> understand Luke's proposal better. The approach looks neat, but the
> uncertainty of *when* GC is going to kick in will make users' life hard.
> If the user happens to configure a large JVM heap size, and since rocksDb
> uses off-heap memory, GC might start very late and less frequent than what
> we want. If we don't have a *definitive* way to let user close the
> underlying resources, then there is no good way to handle such failures of
> a critical application in production.
>
> Having a close method in the iterator might be a little unorthodox :). To
> some degree, this is actually a resource we are holding underneath, and I
> think it's pretty common to have close() for a resource in Java, e.g.
> BufferedReader and BufferedWriter. So I would imagine that we also define a
> resource for the state iterator and make the interface implements
> *AutoCloseable*. Here is my sketch:
>
> // StateResource MUST be closed after use.
> try (StateResource<Iterator<SomeType>>> st = bagState.iteratorResource()) {
>     Iterator<SomeType> iter = st.iterator();
>     while (iter.hasNext() {
>        .. do stuff ...
>     }
> } catch (Exception e) {
>     ... user code
> }
>
> The type/method name are just for illustrating here, so please don't laugh
> at them. Please feel free to comment and let me know if you have thoughts
> about the programming patterns here.
>
> Thanks,
> Xinyu
>
> On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <k...@google.com> wrote:
>
>> It is too soon to argue whether an API is complex or not. There has been
>> no specific API proposed.
>>
>> I think the problem statement is real - you need to be able to read and
>> write bigger-than-memory state. It seems we have multiple runners that
>> don't support it, perhaps because of our API. You might be able to build
>> something good enough with phantom references, but you might not.
>>
>> If I understand the idea, it might look something like this:
>>
>>     new DoFn<>() {
>>        @StateId("foo")
>>        private final StateSpec<BagState<Whatever>> myBagSpec = ...
>>
>>        @ProcessElement
>>        public void proc(@StateId("foo") BagState<Whatever> myBag, ...) {
>>          CloseableIterator<Whatever> iterator = myBag.get().iterator();
>>          while(iterator.hasNext() && ... special condition ...) {
>>            ... do stuff ...
>>          }
>>          iterator.close();
>>        }
>>      }
>>
>> So it has no impact on users who don't choose to close() since they
>> iterate with for ( : ) as usual. And a runner that has the 10x funding to
>> try out a ReferenceQueue can be resilient to users that forget. On the
>> other hand, I haven't seen this pattern much in the wild, so I think it is
>> valuable to discuss other methods.
>>
>> While Luke's proposal is something like this if I understand his sketch
>> (replacing WeakReference with PhantomReference seems to be what you really
>> want):
>>
>>     ... in RocksDb state implementation ...
>>     class RocksDbBagState {
>>       static ReferenceQueue rocksDbIteratorQueue = new ReferenceQueue();
>>
>>       class Iterator {
>>          PhantomReference<RocksDbJniIterator> cIter;
>>          .next() {
>>            return cIter.next();
>>          }
>>       }
>>
>>      class Iterable {
>>         .iterator() {
>>           return new Iterator(new PhantomReference<>(rocksDbJniIterator,
>> rocksDbIteratorQueue));
>>         }
>>       }
>>     }
>>
>>     ... on another thread ...
>>     while(true) {
>>       RocksDbIterator deadRef = (RocksDbIterator)
>> rocksDbIteratorQueue.remove();
>>       deadRef.close();
>>     }
>>
>> When the iterator is GC'd, the phantom reference will pop onto the queue
>> for being closed. This might not be too bad. You'll have delayed resource
>> release, and potentially masked errors that are hard to debug. It is less
>> error-prone than WeakReference, which is asking for trouble when objects
>> are collected en masse. Anecdotally I have heard that performance of this
>> kind of approach is poor, but I haven't experienced it myself and I can't
>> find good data.
>>
>> Kenn
>>
>>
>>
>> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>
>>> If I understand correctly, using weak references will help clean up the
>>> Java objects once GC kicks in. In case of kv-store likes rocksDb, the Java
>>> iterator is just a JNI interface to the underlying C iterator, so we need
>>> to explicitly invoke close to release the in-memory snapshot data, which
>>> can be large and accumulated quickly if it's not released when not in use.
>>> Maybe I am missing something as you suggested here, but looks to me using
>>> weak references might not help in this case.
>>>
>>> I understand your concern, and I think you might misunderstood what I
>>> meant. I am totally for working hard for best user experience, and I think
>>> the current API provides a good example of that. That's also the reason I
>>> am implementing a runner here. I am just proposing an extra API to expose
>>> an iterator that can be closed when not needed, that way the users can use
>>> this feature to iterate through large state that doesn't fit into memory. I
>>> believe this is also a pretty general use case and it's better to have
>>> support for it. I am actually arguing this will be a better user experience
>>> to add this extra API since more users can benefit from it.
>>>
>>> Thanks,
>>> Xinyu
>>>
>>> On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> I don't agree. I believe you can track the iterators/iterables that are
>>>> created and freed by using weak references and reference queues (or other
>>>> methods). Having a few people work 10x as hard to provide a good
>>>> implementation is much better then having 100s or 1000s of users suffering
>>>> through a more complicated API.
>>>>
>>>> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Load/evict blocks will help reduce the cache memory footprint, but we
>>>>> still won't be able to release the underlying resources. We can add
>>>>> definitely heuristics to help release the resources as you mentioned, but
>>>>> there is no accurate way to track all the iterators/iterables created and
>>>>> free them up once not needed. I think while the API is aimed at nice user
>>>>> experience, we should have the option to let users optimize their
>>>>> performance if they choose to. Do you agree?
>>>>>
>>>>> Thanks,
>>>>> Xinyu
>>>>>
>>>>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Users won't reliably close/release the resources and forcing them to
>>>>>> will make the user experience worse.
>>>>>> It will make a lot more sense to use a file format which allows
>>>>>> random access and use a cache to load/evict blocks of the state from 
>>>>>> memory.
>>>>>> If that is not possible, use an iterable which frees the resource
>>>>>> after a certain amount of inactivity or uses weak references.
>>>>>>
>>>>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <xinyuliu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, folks,
>>>>>>>
>>>>>>> I'm in the middle of implementing the MapState and SetState in our
>>>>>>> Samza runner. We noticed that the state returns the Java Iterable for
>>>>>>> reading entries, keys, etc. For state backed by file-based kv store like
>>>>>>> rocksDb, we need to be able to let users explicitly close 
>>>>>>> iterator/iterable
>>>>>>> to release the resources.Otherwise we have to load the iterable into 
>>>>>>> memory
>>>>>>> so we can safely close the underlying rocksDb iterator, similar to 
>>>>>>> Flink's
>>>>>>> implementation. But this won't work for states that don't fit into
>>>>>>> memory. I chatted with Kenn and he also agrees we need this capability 
>>>>>>> to
>>>>>>> avoid bulk read/write. This seems to be a general use case and I'm
>>>>>>> wondering if we can add the support to it? I am happy to contribute to 
>>>>>>> this
>>>>>>> if needed. Any feedback is highly appreciated.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Xinyu
>>>>>>>
>>>>>>
>>>>>
>>>
>

Reply via email to