robertwb commented on a change in pull request #14610:
URL: https://github.com/apache/beam/pull/14610#discussion_r626147081
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -357,31 +357,34 @@ protected void reportElementSize(long elementSize) {
}
final Distribution distribution;
+ ByteSizeObserver byteCountObserver;
public SampleByteSizeDistribution(Distribution distribution) {
this.distribution = distribution;
+ this.byteCountObserver = null;
}
public void tryUpdate(T value, Coder<T> coder) throws Exception {
if (shouldSampleElement()) {
// First try using byte size observer
- ByteSizeObserver observer = new ByteSizeObserver();
- coder.registerByteSizeObserver(value, observer);
-
- if (!observer.getIsLazy()) {
- observer.advance();
- this.distribution.update(observer.observedSize);
- } else {
- // TODO(BEAM-11841): Optimize calculation of element size for
iterables.
- // Coder byte size observation is lazy (requires iteration for
observation) so fall back
- // to counting output stream
- CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
- coder.encode(value, os);
- this.distribution.update(os.getCount());
+ byteCountObserver = new ByteSizeObserver();
+ coder.registerByteSizeObserver(value, byteCountObserver);
+
+ if (!byteCountObserver.getIsLazy()) {
+ byteCountObserver.advance();
+ this.distribution.update(byteCountObserver.observedSize);
}
}
Review comment:
Ah. So we're relying on the fact that we would have implicitly cleared
this bit one way or another when sampling the previous element. This is
correct, but not very obvious, so I'd still prefer explicitly setting the
byteCountObserver to null here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]