kileys commented on a change in pull request #13924:
URL: https://github.com/apache/beam/pull/13924#discussion_r574939104



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +338,59 @@ public double getProgress() {
       return delegate.getProgress();
     }
   }
+
+  private static class SampleByteSizeEstimateDistribution<T> {
+    /** Basic implementation of {@link ElementByteSizeObserver} for use in 
size estimation. */
+    private static class ByteSizeObserver extends ElementByteSizeObserver {
+      private long observedSize = 0;
+
+      @Override
+      protected void reportElementSize(long elementSize) {
+        observedSize += elementSize;
+      }
+    }
+
+    final Distribution distribution;
+
+    public SampleByteSizeEstimateDistribution(Distribution distribution) {
+      this.distribution = distribution;
+    }
+
+    public void tryUpdate(T value, Coder<T> coder) throws Exception {
+      if (shouldSampleElement() || 
coder.isRegisterByteSizeObserverCheap(value)) {
+        // First try using byte size observer
+        ByteSizeObserver observer = new ByteSizeObserver();
+        coder.registerByteSizeObserver(value, observer);
+
+        if (!observer.getIsLazy()) {
+          observer.advance();
+          this.distribution.update(observer.observedSize);
+        } else {
+          // Coder byte size observation is lazy (requires iteration for 
observation) so fall back
+          // to counting output stream
+          CountingOutputStream os = new 
CountingOutputStream(ByteStreams.nullOutputStream());
+          coder.encode(value, os);
+          this.distribution.update(os.getCount());
+        }
+      }
+    }
+
+    // Lowest sampling probability: 0.001%.
+    private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
+    private static final int SAMPLING_CUTOFF = 10;
+    private int samplingToken = 0;
+    private Random randomGenerator = new Random();
+
+    private boolean shouldSampleElement() {

Review comment:
       Copied sampling logic from legacy worker -- do we want to be able to 
customize or update these values?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to