boyuanzz commented on a change in pull request #13924:
URL: https://github.com/apache/beam/pull/13924#discussion_r582529535



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -210,8 +233,8 @@ public void accept(WindowedValue<T> input) throws Exception 
{
       try (Closeable close =
           
MetricsEnvironment.scopedMetricsContainer(this.unboundMetricContainer)) {
         // Increment the counter for each window the element occurs in.
-        this.counter.inc(input.getWindows().size());
-
+        this.elementCountCounter.inc(input.getWindows().size());
+        this.sampledByteSizeDistribution.tryUpdate(input.getValue(), 
this.coder);

Review comment:
       Should the size also be multiplied by # of window even though for most 
cases the number of window is always 1?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -258,13 +286,17 @@ public void accept(WindowedValue<T> input) throws 
Exception {
       try (Closeable close =
           
MetricsEnvironment.scopedMetricsContainer(this.unboundMetricContainer)) {
         // Increment the counter for each window the element occurs in.
-        this.counter.inc(input.getWindows().size());
-
+        this.elementCountCounter.inc(input.getWindows().size());
         // Wrap the consumer with extra logic to set the metric container with 
the appropriate
         // PTransform context. This ensures that user metrics obtain the 
pTransform ID when they are
         // created. Also use the ExecutionStateTracker and enter an 
appropriate state to track the
         // Process Bundle Execution time metric.
         for (ConsumerAndMetadata consumerAndMetadata : consumerAndMetadatas) {
+
+          if (consumerAndMetadata.getValueCoder() != null) {
+            this.sampledByteSizeDistribution.tryUpdate(

Review comment:
       Same above.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +336,59 @@ public double getProgress() {
       return delegate.getProgress();
     }
   }
+
+  private static class SampleByteSizeDistribution<T> {
+    /** Basic implementation of {@link ElementByteSizeObserver} for use in 
size estimation. */
+    private static class ByteSizeObserver extends ElementByteSizeObserver {
+      private long observedSize = 0;
+
+      @Override
+      protected void reportElementSize(long elementSize) {
+        observedSize += elementSize;
+      }
+    }
+
+    final Distribution distribution;
+
+    public SampleByteSizeDistribution(Distribution distribution) {
+      this.distribution = distribution;
+    }
+
+    public void tryUpdate(T value, Coder<T> coder) throws Exception {
+      if (shouldSampleElement() || 
coder.isRegisterByteSizeObserverCheap(value)) {
+        // First try using byte size observer
+        ByteSizeObserver observer = new ByteSizeObserver();
+        coder.registerByteSizeObserver(value, observer);
+
+        if (!observer.getIsLazy()) {
+          observer.advance();
+          this.distribution.update(observer.observedSize);
+        } else {
+          // Coder byte size observation is lazy (requires iteration for 
observation) so fall back

Review comment:
       If there is a way of optimization, we should applying it to here as well:
   
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L295-L303




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to