bvolpato commented on code in PR #29517:
URL: https://github.com/apache/beam/pull/29517#discussion_r1402245782
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java:
##########
@@ -81,11 +88,78 @@ public StateBackedIterable(
this.suffix =
StateFetchingIterators.readAllAndDecodeStartingFrom(
Caches.subCache(cache, stateKey), beamFnStateClient, request,
elemCoder);
+ this.elemCoder = elemCoder;
+ }
+
+ @SuppressWarnings("nullness")
+ private static class WrappedObservingIterator<T> extends
ElementByteSizeObservableIterator<T> {
+ private final Iterator<T> wrappedIterator;
+ private final org.apache.beam.sdk.coders.Coder<T> elementCoder;
+
+ // Logically final and non-null but initialized after construction by
factory method for
+ // initialization ordering.
+ private ElementByteSizeObserver observerProxy = null;
+
+ private boolean observerNeedsAdvance = false;
+
+ static <T> WrappedObservingIterator<T> create(
+ Iterator<T> iterator, org.apache.beam.sdk.coders.Coder<T>
elementCoder) {
+ WrappedObservingIterator<T> result = new
WrappedObservingIterator<>(iterator, elementCoder);
+ result.observerProxy =
+ new ElementByteSizeObserver() {
+ @Override
+ protected void reportElementSize(long elementByteSize) {
+ result.notifyValueReturned(elementByteSize);
+ }
+ };
+ return result;
+ }
+
+ private WrappedObservingIterator(
+ Iterator<T> iterator, org.apache.beam.sdk.coders.Coder<T>
elementCoder) {
+ this.wrappedIterator = iterator;
+ this.elementCoder = elementCoder;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (observerNeedsAdvance) {
+ observerProxy.advance();
+ observerNeedsAdvance = false;
+ }
+ return wrappedIterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ T value = wrappedIterator.next();
+ try {
+ elementCoder.registerByteSizeObserver(value, observerProxy);
+ if (observerProxy.getIsLazy()) {
+ // The observer will only be notified of bytes as the result
+ // is used. We defer advancing the observer until hasNext in an
+ // attempt to capture those bytes.
+ observerNeedsAdvance = true;
+ } else {
+ observerNeedsAdvance = false;
+ observerProxy.advance();
+ }
+ } catch (Exception e) {
+ // Don't notify of the byte size.
Review Comment:
If I understand it correctly, this could happen due to a user error such as
bad coder / element -- and the sizes will be definitely wrong. And it might not
express the error in any other way (e.g., for fused steps).
Should we at least log/WARN that something is wrong?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]