Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-132692640 Sorry for the late review @ChengXiangLi. I finished it now and the PR looks really good. There is only one thing I would like to change before merging it. This is to move the `sample` and the `sampleWithSize` methods from the `DataSet` to the `DataSetUtils` class. This will effectively make the `sample` methods not part of the core API. The reason for this is that the `sampleWithSize` method breaks with one of Flink's guarantees, which is the robustness of the core API functions against `OutOfMemoryExceptions`. Let me elaborate on it for better understanding. All of Flink internal operations work on serialized data which is stored in Flink's *managed* memory. The managed memory allows Flink to detect when to spill elements from memory to disk to avoid out of memory exceptions and, thus, to make the system robust. The managed memory is a pre-allocated area of memory which is administered by the `MemoryManager`. The `MemoryManager` allows you to allocate and deallocate `MemorySegments` in a c-style fashion. However, once a data item enters a UDF, the item has to be deserialized putting it on the remaining heap. This is not bad if your UDF does not accumulate these elements. However, the `sampleWithSize` method materializes up to `numSamples` elements on the heap. Depending on the number of samples and the data item size, this might be enough to eat up all remaining heap memory space and to crash the JVM. I think that your current implementation will work for most use cases but in order to make it part of the core API, we also have to deal with the case where our sample cannot be materialized on the remaining heap of a running Flink program. In order to achieve this, I think it would be necessary to implement a native `topK` operator. With *native* I mean an operator which works on Flink's managed memory and, thus, can also deal with spilling records to disk. Having such a `topK` operator, we could reimplement the reservoir sampling algorithm the following way: For sampling without replacement we first assign weights in a map operation to each element. Then we call topK with respect to the weights and obtain the sample. For the sampling with replacement we could simply use a flat map operation to assign `numSamples` times a weight to each element. Then we again call `topK` with respect to the weight. For the topK implementation, we would need something like a `PriorityQueue` which operates on managed memory (similar to the `CompactingHashTable` which is a hash table working on managed memory). Thus, we would have a priority queue which stores the priority values of each record and a pointer to the record which is kept in managed memory. Whenever an element is removed from the priority queue, we can also free the occupied managed memory. In case that we run out of managed memory, we have to spill some of the records to disk which are still in the race for the top k. As a first step, we can skip the spilling and just throw a proper exception (other than `OutOfMemoryException`) when we run out of memory. Afterwards, we can incrementally add the spilling functionality. I know that you've already spent a lot of effort into writing the sampling operator and that this result might be a little bit demotivating. However, if we want to make it right and robust, then I think this is the way to go. Additionally we would add a proper topK operator to Flink's API which is missing big time :-) If you want to, then you could also take the lead here. The further discussion should then happen in a separate issue. I'm more than willing to assist you in implementing this operator. What do you think?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---