kileys commented on a change in pull request #13924:
URL: https://github.com/apache/beam/pull/13924#discussion_r578589247
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -62,16 +70,18 @@
@SuppressWarnings({"rawtypes"})
abstract static class ConsumerAndMetadata {
public static ConsumerAndMetadata forConsumer(
- FnDataReceiver consumer, String pTransformId, SimpleExecutionState
state) {
+ FnDataReceiver consumer, String pTransformId, SimpleExecutionState
state, Coder coder) {
return new AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata(
- consumer, pTransformId, state);
+ consumer, pTransformId, state, coder);
}
public abstract FnDataReceiver getConsumer();
public abstract String getPTransformId();
public abstract SimpleExecutionState getExecutionState();
+
+ public abstract @Nullable Coder getCoder();
Review comment:
It was from a test where they didn't pass in an empty coders map. If
that's not possible in the real use cases, I can update the tests and remove
the nullable
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +338,59 @@ public double getProgress() {
return delegate.getProgress();
}
}
+
+ private static class SampleByteSizeEstimateDistribution<T> {
+ /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
+ private static class ByteSizeObserver extends ElementByteSizeObserver {
+ private long observedSize = 0;
+
+ @Override
+ protected void reportElementSize(long elementSize) {
+ observedSize += elementSize;
+ }
+ }
+
+ final Distribution distribution;
+
+ public SampleByteSizeEstimateDistribution(Distribution distribution) {
+ this.distribution = distribution;
+ }
+
+ public void tryUpdate(T value, Coder<T> coder) throws Exception {
+ if (shouldSampleElement() ||
coder.isRegisterByteSizeObserverCheap(value)) {
+ // First try using byte size observer
+ ByteSizeObserver observer = new ByteSizeObserver();
+ coder.registerByteSizeObserver(value, observer);
+
+ if (!observer.getIsLazy()) {
+ observer.advance();
+ this.distribution.update(observer.observedSize);
+ } else {
+ // Coder byte size observation is lazy (requires iteration for
observation) so fall back
+ // to counting output stream
+ CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
+ coder.encode(value, os);
+ this.distribution.update(os.getCount());
+ }
+ }
+ }
+
+ // Lowest sampling probability: 0.001%.
+ private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
+ private static final int SAMPLING_CUTOFF = 10;
+ private int samplingToken = 0;
+ private Random randomGenerator = new Random();
+
+ private boolean shouldSampleElement() {
Review comment:
Ran this micro benchmark and got:
With coder 79309931
Without coder 76130704
```
PCollectionConsumerRegistry consumers =
new PCollectionConsumerRegistry(
metricsContainerRegistry, mock(ExecutionStateTracker.class));
FnDataReceiver<WindowedValue<String>> consumerA1 =
mock(FnDataReceiver.class);
consumers.register(pCollectionA, pTransformIdA, consumerA1,
StringUtf8Coder.of());
FnDataReceiver<WindowedValue<String>> wrapperConsumer =
(FnDataReceiver<WindowedValue<String>>)
(FnDataReceiver) consumers.getMultiplexingConsumer(pCollectionA);
String elementValue = "test pcollection size estimation benchmark";
WindowedValue<String> element = valueInGlobalWindow(elementValue);
long coderDur = 0;
int numElements = 1000;
int runs = 100;
for (int run = 0; run < runs; run++) {
long startTime = System.nanoTime();
for (int i = 0; i < numElements; i++) {
wrapperConsumer.accept(element);
}
long endTime = System.nanoTime();
coderDur += (endTime - startTime);
}
long nullCoderDur = 0;
consumers.register(pCollectionB, pTransformIdB, consumerA1, null);
wrapperConsumer =
(FnDataReceiver<WindowedValue<String>>)
(FnDataReceiver) consumers.getMultiplexingConsumer(pCollectionB);
for (long run = 0; run < runs; run++) {
long startTime = System.nanoTime();
for (int i = 0; i < numElements; i++) {
wrapperConsumer.accept(element);
}
long endTime = System.nanoTime();
nullCoderDur += (endTime - startTime);
}
System.out.println("With coder " + coderDur/runs);
System.out.println("Without coder " + nullCoderDur/runs);
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -62,16 +70,18 @@
@SuppressWarnings({"rawtypes"})
abstract static class ConsumerAndMetadata {
public static ConsumerAndMetadata forConsumer(
- FnDataReceiver consumer, String pTransformId, SimpleExecutionState
state) {
+ FnDataReceiver consumer, String pTransformId, SimpleExecutionState
state, Coder coder) {
Review comment:
Sounds good to me
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +336,59 @@ public double getProgress() {
return delegate.getProgress();
}
}
+
+ private static class SampleByteSizeDistribution<T> {
+ /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
+ private static class ByteSizeObserver extends ElementByteSizeObserver {
+ private long observedSize = 0;
+
+ @Override
+ protected void reportElementSize(long elementSize) {
+ observedSize += elementSize;
+ }
+ }
+
+ final Distribution distribution;
+
+ public SampleByteSizeDistribution(Distribution distribution) {
+ this.distribution = distribution;
+ }
+
+ public void tryUpdate(T value, Coder<T> coder) throws Exception {
+ if (shouldSampleElement() ||
coder.isRegisterByteSizeObserverCheap(value)) {
+ // First try using byte size observer
+ ByteSizeObserver observer = new ByteSizeObserver();
+ coder.registerByteSizeObserver(value, observer);
+
+ if (!observer.getIsLazy()) {
+ observer.advance();
+ this.distribution.update(observer.observedSize);
+ } else {
+ // Coder byte size observation is lazy (requires iteration for
observation) so fall back
Review comment:
Looked into this a bit more and the IterableCoder sets isLazy to true
only for ElementByteSizeObservableIterable types, so this else shouldn't be used
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
##########
@@ -305,4 +336,59 @@ public double getProgress() {
return delegate.getProgress();
}
}
+
+ private static class SampleByteSizeDistribution<T> {
+ /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
+ private static class ByteSizeObserver extends ElementByteSizeObserver {
+ private long observedSize = 0;
+
+ @Override
+ protected void reportElementSize(long elementSize) {
+ observedSize += elementSize;
+ }
+ }
+
+ final Distribution distribution;
+
+ public SampleByteSizeDistribution(Distribution distribution) {
+ this.distribution = distribution;
+ }
+
+ public void tryUpdate(T value, Coder<T> coder) throws Exception {
+ if (shouldSampleElement() ||
coder.isRegisterByteSizeObserverCheap(value)) {
+ // First try using byte size observer
+ ByteSizeObserver observer = new ByteSizeObserver();
+ coder.registerByteSizeObserver(value, observer);
+
+ if (!observer.getIsLazy()) {
+ observer.advance();
+ this.distribution.update(observer.observedSize);
+ } else {
+ // Coder byte size observation is lazy (requires iteration for
observation) so fall back
+ // to counting output stream
+ CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
+ coder.encode(value, os);
+ this.distribution.update(os.getCount());
+ }
+ }
+ }
+
+ // Lowest sampling probability: 0.001%.
+ private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
+ private static final int SAMPLING_CUTOFF = 10;
+ private int samplingToken = 0;
+ private Random randomGenerator = new Random();
+
+ private boolean shouldSampleElement() {
+ // Sampling probability decreases as the element count is increasing.
+ // We unconditionally sample the first samplingCutoff elements. For the
+ // next samplingCutoff elements, the sampling probability drops from 100%
+ // to 50%. The probability of sampling the Nth element is:
+ // min(1, samplingCutoff / N), with an additional lower bound of
+ // samplingCutoff / samplingTokenUpperBound. This algorithm may be
refined
+ // later.
+ samplingToken = Math.min(samplingToken + 1, SAMPLING_TOKEN_UPPER_BOUND);
+ return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF;
Review comment:
I think the section name's changed for the link. Is this the same one?
https://en.wikipedia.org/wiki/Reservoir_sampling#An_optimal_algorithm
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]