boyuanzz commented on a change in pull request #13924:
URL: https://github.com/apache/beam/pull/13924#discussion_r583890559
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -210,8 +233,9 @@ public void accept(WindowedValue<T> input) throws Exception
{
try (Closeable close =
MetricsEnvironment.scopedMetricsContainer(this.unboundMetricContainer)) {
// Increment the counter for each window the element occurs in.
- this.counter.inc(input.getWindows().size());
-
+ this.elementCountCounter.inc(input.getWindows().size());
+ // TODO update size per window
Review comment:
```suggestion
// TODO(BEAM-11879): Consider updating size per window when we have
window optimization.
```
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -258,13 +287,18 @@ public void accept(WindowedValue<T> input) throws
Exception {
try (Closeable close =
MetricsEnvironment.scopedMetricsContainer(this.unboundMetricContainer)) {
// Increment the counter for each window the element occurs in.
- this.counter.inc(input.getWindows().size());
-
+ this.elementCountCounter.inc(input.getWindows().size());
// Wrap the consumer with extra logic to set the metric container with
the appropriate
// PTransform context. This ensures that user metrics obtain the
pTransform ID when they are
// created. Also use the ExecutionStateTracker and enter an
appropriate state to track the
// Process Bundle Execution time metric.
for (ConsumerAndMetadata consumerAndMetadata : consumerAndMetadatas) {
+
+ if (consumerAndMetadata.getValueCoder() != null) {
+ // TODO update size per window
Review comment:
```suggestion
// TODO(BEAM-11879): Consider updating size per window when we
have window optimization.
```
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +339,61 @@ public double getProgress() {
return delegate.getProgress();
}
}
+
+ private static class SampleByteSizeDistribution<T> {
+ /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
+ private static class ByteSizeObserver extends ElementByteSizeObserver {
+ private long observedSize = 0;
+
+ @Override
+ protected void reportElementSize(long elementSize) {
+ observedSize += elementSize;
+ }
+ }
+
+ final Distribution distribution;
+
+ public SampleByteSizeDistribution(Distribution distribution) {
+ this.distribution = distribution;
+ }
+
+ public void tryUpdate(T value, Coder<T> coder) throws Exception {
+ if (shouldSampleElement()) {
+ // First try using byte size observer
+ ByteSizeObserver observer = new ByteSizeObserver();
+ coder.registerByteSizeObserver(value, observer);
+
+ if (!observer.getIsLazy()) {
+ observer.advance();
+ this.distribution.update(observer.observedSize);
+ } else {
+ // TODO (BEAM-11841) Optimize calculation of element size for
iterables
+ // Coder byte size observation is lazy (requires iteration for
observation) so fall back
+ // to counting output stream
+ CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
+ coder.encode(value, os);
+ this.distribution.update(os.getCount());
+ }
+ }
+ }
+
+ // Lowest sampling probability: 0.001%.
+ private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
+ private static final int SAMPLING_CUTOFF = 10;
+ private int samplingToken = 0;
+ private Random randomGenerator = new Random();
+
+ // TODO (BEAM-11836) Implement fast approximation for reservoir sampling
Review comment:
```suggestion
// TODO(BEAM-11836): Implement fast approximation for reservoir sampling.
```
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +339,61 @@ public double getProgress() {
return delegate.getProgress();
}
}
+
+ private static class SampleByteSizeDistribution<T> {
+ /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
+ private static class ByteSizeObserver extends ElementByteSizeObserver {
+ private long observedSize = 0;
+
+ @Override
+ protected void reportElementSize(long elementSize) {
+ observedSize += elementSize;
+ }
+ }
+
+ final Distribution distribution;
+
+ public SampleByteSizeDistribution(Distribution distribution) {
+ this.distribution = distribution;
+ }
+
+ public void tryUpdate(T value, Coder<T> coder) throws Exception {
+ if (shouldSampleElement()) {
+ // First try using byte size observer
+ ByteSizeObserver observer = new ByteSizeObserver();
+ coder.registerByteSizeObserver(value, observer);
+
+ if (!observer.getIsLazy()) {
+ observer.advance();
+ this.distribution.update(observer.observedSize);
+ } else {
+ // TODO (BEAM-11841) Optimize calculation of element size for
iterables
Review comment:
```suggestion
// TODO(BEAM-11841): Optimize calculation of element size for
iterables.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]