robertwb commented on a change in pull request #13924:
URL: https://github.com/apache/beam/pull/13924#discussion_r578821073
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +334,54 @@ public double getProgress() {
return delegate.getProgress();
}
}
+
+ private static class SampleByteSizeDistribution<T> {
+ /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
+ private static class ByteSizeObserver extends ElementByteSizeObserver {
+ private long observedSize = 0;
+
+ @Override
+ protected void reportElementSize(long elementSize) {
+ observedSize += elementSize;
+ }
+ }
+
+ final Distribution distribution;
+
+ public SampleByteSizeDistribution(Distribution distribution) {
+ this.distribution = distribution;
+ }
+
+ public void tryUpdate(T value, Coder<T> coder) throws Exception {
+ if (shouldSampleElement()) {
+ // First try using byte size observer
+ ByteSizeObserver observer = new ByteSizeObserver();
+ coder.registerByteSizeObserver(value, observer);
+
+ if (!observer.getIsLazy()) {
Review comment:
This code, as written, looks like it simply won't sample lazy elements,
which I think would be a regression. (I would be OK with implementing that in a
follow-up PR.)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]