scwhittle commented on code in PR #29517:
URL: https://github.com/apache/beam/pull/29517#discussion_r1402677691


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java:
##########
@@ -81,11 +88,78 @@ public StateBackedIterable(
     this.suffix =
         StateFetchingIterators.readAllAndDecodeStartingFrom(
             Caches.subCache(cache, stateKey), beamFnStateClient, request, 
elemCoder);
+    this.elemCoder = elemCoder;
+  }
+
+  @SuppressWarnings("nullness")
+  private static class WrappedObservingIterator<T> extends 
ElementByteSizeObservableIterator<T> {
+    private final Iterator<T> wrappedIterator;
+    private final org.apache.beam.sdk.coders.Coder<T> elementCoder;
+
+    // Logically final and non-null but initialized after construction by 
factory method for
+    // initialization ordering.
+    private ElementByteSizeObserver observerProxy = null;
+
+    private boolean observerNeedsAdvance = false;
+
+    static <T> WrappedObservingIterator<T> create(
+        Iterator<T> iterator, org.apache.beam.sdk.coders.Coder<T> 
elementCoder) {
+      WrappedObservingIterator<T> result = new 
WrappedObservingIterator<>(iterator, elementCoder);
+      result.observerProxy =
+          new ElementByteSizeObserver() {
+            @Override
+            protected void reportElementSize(long elementByteSize) {
+              result.notifyValueReturned(elementByteSize);
+            }
+          };
+      return result;
+    }
+
+    private WrappedObservingIterator(
+        Iterator<T> iterator, org.apache.beam.sdk.coders.Coder<T> 
elementCoder) {
+      this.wrappedIterator = iterator;
+      this.elementCoder = elementCoder;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (observerNeedsAdvance) {
+        observerProxy.advance();
+        observerNeedsAdvance = false;
+      }
+      return wrappedIterator.hasNext();
+    }
+
+    @Override
+    public T next() {
+      T value = wrappedIterator.next();
+      try {
+        elementCoder.registerByteSizeObserver(value, observerProxy);
+        if (observerProxy.getIsLazy()) {
+          // The observer will only be notified of bytes as the result
+          // is used. We defer advancing the observer until hasNext in an
+          // attempt to capture those bytes.
+          observerNeedsAdvance = true;
+        } else {
+          observerNeedsAdvance = false;
+          observerProxy.advance();
+        }
+      } catch (Exception e) {
+        // Don't notify of the byte size.

Review Comment:
   Added a log, throttled to just once for each iterator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to