kileys commented on a change in pull request #14406:
URL: https://github.com/apache/beam/pull/14406#discussion_r607285932



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -377,23 +377,45 @@ public void tryUpdate(T value, Coder<T> coder) throws 
Exception {
       }
     }
 
-    // Lowest sampling probability: 0.001%.
-    private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
     private static final int SAMPLING_CUTOFF = 10;
-    private int samplingToken = 0;
+    private long samplingToken = 0;
+    private long nextSamplingToken = 0;
     private Random randomGenerator = new Random();
 
-    // TODO(BEAM-11836): Implement fast approximation for reservoir sampling.
     private boolean shouldSampleElement() {
       // Sampling probability decreases as the element count is increasing.
-      // We unconditionally sample the first samplingCutoff elements. For the
-      // next samplingCutoff elements, the sampling probability drops from 100%
-      // to 50%. The probability of sampling the Nth element is:
-      // min(1, samplingCutoff / N), with an additional lower bound of
-      // samplingCutoff / samplingTokenUpperBound. This algorithm may be 
refined
-      // later.
-      samplingToken = Math.min(samplingToken + 1, SAMPLING_TOKEN_UPPER_BOUND);
-      return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF;
+      // We unconditionally sample the first samplingCutoff elements. 
Calculating
+      // nextInt(samplingToken) for each element is expensive, so after a 
threshold, calculate the
+      // gap to next sample.
+      // 
https://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/
+
+      // Reset samplingToken if it's going to exceed the max value.
+      if (samplingToken + 1 == Long.MAX_VALUE) {
+        samplingToken = 0;
+        nextSamplingToken = getNextSamplingToken(samplingToken);
+      }
+
+      samplingToken++;
+      // Use traditional sampling until the threshold of 30
+      if (nextSamplingToken == 0) {
+        if (randomGenerator.nextInt((int) samplingToken) <= SAMPLING_CUTOFF) {
+          if (samplingToken > 30) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to