boyuanzz commented on a change in pull request #13924:
URL: https://github.com/apache/beam/pull/13924#discussion_r578059687
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -62,16 +70,18 @@
@SuppressWarnings({"rawtypes"})
abstract static class ConsumerAndMetadata {
public static ConsumerAndMetadata forConsumer(
- FnDataReceiver consumer, String pTransformId, SimpleExecutionState
state) {
+ FnDataReceiver consumer, String pTransformId, SimpleExecutionState
state, Coder coder) {
Review comment:
```suggestion
FnDataReceiver consumer, String pTransformId, SimpleExecutionState
state, Coder valueCoder) {
```
`valueCoder` seems like less misleading to me. What do you think?
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -62,16 +70,18 @@
@SuppressWarnings({"rawtypes"})
abstract static class ConsumerAndMetadata {
public static ConsumerAndMetadata forConsumer(
- FnDataReceiver consumer, String pTransformId, SimpleExecutionState
state) {
+ FnDataReceiver consumer, String pTransformId, SimpleExecutionState
state, Coder coder) {
return new AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata(
- consumer, pTransformId, state);
+ consumer, pTransformId, state, coder);
}
public abstract FnDataReceiver getConsumer();
public abstract String getPTransformId();
public abstract SimpleExecutionState getExecutionState();
+
+ public abstract @Nullable Coder getCoder();
Review comment:
Why the coder could be null here?
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -210,8 +232,10 @@ public void accept(WindowedValue<T> input) throws
Exception {
try (Closeable close =
MetricsEnvironment.scopedMetricsContainer(this.unboundMetricContainer)) {
// Increment the counter for each window the element occurs in.
- this.counter.inc(input.getWindows().size());
-
+ this.elementCountCounter.inc(input.getWindows().size());
+ if (this.coder != null) {
Review comment:
Same above. Any reason that `coder` would be `null` here?
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +338,59 @@ public double getProgress() {
return delegate.getProgress();
}
}
+
+ private static class SampleByteSizeEstimateDistribution<T> {
+ /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
+ private static class ByteSizeObserver extends ElementByteSizeObserver {
+ private long observedSize = 0;
+
+ @Override
+ protected void reportElementSize(long elementSize) {
+ observedSize += elementSize;
+ }
+ }
+
+ final Distribution distribution;
+
+ public SampleByteSizeEstimateDistribution(Distribution distribution) {
+ this.distribution = distribution;
+ }
+
+ public void tryUpdate(T value, Coder<T> coder) throws Exception {
+ if (shouldSampleElement() ||
coder.isRegisterByteSizeObserverCheap(value)) {
+ // First try using byte size observer
+ ByteSizeObserver observer = new ByteSizeObserver();
+ coder.registerByteSizeObserver(value, observer);
+
+ if (!observer.getIsLazy()) {
+ observer.advance();
+ this.distribution.update(observer.observedSize);
+ } else {
+ // Coder byte size observation is lazy (requires iteration for
observation) so fall back
+ // to counting output stream
+ CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
+ coder.encode(value, os);
+ this.distribution.update(os.getCount());
+ }
+ }
+ }
+
+ // Lowest sampling probability: 0.001%.
+ private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
+ private static final int SAMPLING_CUTOFF = 10;
+ private int samplingToken = 0;
+ private Random randomGenerator = new Random();
+
+ private boolean shouldSampleElement() {
Review comment:
It's ok to me to keep the same logic here. Would be really helpful if we
have benchmark here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]