Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/949#issuecomment-132692640
  
    Sorry for the late review @ChengXiangLi. I finished it now and the PR looks 
really good. 
    
    There is only one thing I would like to change before merging it. This is 
to move the `sample` and the `sampleWithSize` methods from the `DataSet` to the 
`DataSetUtils` class. This will effectively make the `sample` methods not part 
of the core API. The reason for this is that the `sampleWithSize` method breaks 
with one of Flink's guarantees, which is the robustness of the core API 
functions against `OutOfMemoryExceptions`. Let me elaborate on it for better 
understanding.
    
    All of Flink internal operations work on serialized data which is stored in 
Flink's *managed* memory. The managed memory allows Flink to detect when to 
spill elements from memory to disk to avoid out of memory exceptions and, thus, 
to make the system robust. The managed memory is a pre-allocated area of memory 
which is administered by the `MemoryManager`. The `MemoryManager` allows you to 
allocate and deallocate `MemorySegments` in a c-style fashion. However, once a 
data item enters a UDF, the item has to be deserialized putting it on the 
remaining heap. This is not bad if your UDF does not accumulate these elements. 
However, the `sampleWithSize` method materializes up to `numSamples` elements 
on the heap. Depending on the number of samples and the data item size, this 
might be enough to eat up all remaining heap memory space and to crash the JVM.
    
    I think that your current implementation will work for most use cases but 
in order to make it part of the core API, we also have to deal with the case 
where our sample cannot be materialized on the remaining heap of a running 
Flink program. In order to achieve this, I think it would be necessary to 
implement a native `topK` operator. With *native* I mean an operator which 
works on Flink's managed memory and, thus, can also deal with spilling records 
to disk. Having such a `topK` operator, we could reimplement the reservoir 
sampling algorithm the following way: For sampling without replacement we first 
assign weights in a map operation to each element. Then we call topK with 
respect to the weights and obtain the sample. For the sampling with replacement 
we could simply use a flat map operation to assign `numSamples` times a weight 
to each element. Then we again call `topK` with respect to the weight.
    
    For the topK implementation, we would need something like a `PriorityQueue` 
which operates on managed memory (similar to the `CompactingHashTable` which is 
a hash table working on managed memory). Thus, we would have a priority queue 
which stores the priority values of each record and a pointer to the record 
which is kept in managed memory. Whenever an element is removed from the 
priority queue, we can also free the occupied managed memory. In case that we 
run out of managed memory, we have to spill some of the records to disk which 
are still in the race for the top k. As a first step, we can skip the spilling 
and just throw a proper exception (other than `OutOfMemoryException`) when we 
run out of memory. Afterwards, we can incrementally add the spilling 
functionality.
    
    I know that you've already spent a lot of effort into writing the sampling 
operator and that this result might be a little bit demotivating. However, if 
we want to make it right and robust, then I think this is the way to go. 
Additionally we would add a proper topK operator to Flink's API which is 
missing big time :-) If you want to, then you could also take the lead here. 
The further discussion should then happen in a separate issue. I'm more than 
willing to assist you in implementing this operator. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to