scwhittle commented on code in PR #33086:
URL: https://github.com/apache/beam/pull/33086#discussion_r1841800141


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java:
##########
@@ -118,6 +121,35 @@ public boolean isConsumingReceivedData() {
     return consumingReceivedData.get();
   }
 
+  // Copies the elements of list to an array and removes references to 
elements that
+  // have been iterated past.
+  @SuppressWarnings({"initialization", "assignment"})
+  private static <T> Iterator<T> createDiscardingIterator(List<T> list) {
+    if (list.isEmpty()) {
+      // As optimization, don't create array etc for empty list.
+      return list.iterator();
+    }
+
+    @Nullable Object[] array = list.toArray();
+    return new Iterator<T>() {
+      int i = 0;
+
+      @Override
+      public boolean hasNext() {
+        return i < array.length;
+      }
+
+      @Override
+      public T next() {
+        @SuppressWarnings("unchecked")
+        T result = (T) Preconditions.checkNotNull(array[i]);

Review Comment:
   I don't think the original list woudl have null since it is a proto repeated 
field. But you're right it's not needed, removed
   



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java:
##########
@@ -118,6 +121,35 @@ public boolean isConsumingReceivedData() {
     return consumingReceivedData.get();
   }
 
+  // Copies the elements of list to an array and removes references to 
elements that
+  // have been iterated past.
+  @SuppressWarnings({"initialization", "assignment"})
+  private static <T> Iterator<T> createDiscardingIterator(List<T> list) {
+    if (list.isEmpty()) {
+      // As optimization, don't create array etc for empty list.
+      return list.iterator();
+    }
+
+    @Nullable Object[] array = list.toArray();

Review Comment:
   this is copying on order of # of elements.  For the dataflow runner this is 
always a one or two elements (since elements internally have many concatenated 
encoded values).  For timers there is an element per timer.
   
   I think that it coudl help reduce the peak memory. Now if we have a 
state-backed iterable we have the full data page + a state page at a time. With 
this change, I think that we will drop the data page once we have passed and 
moved to the state-backed iterable and thus both a data+state page will not be 
in memory at once for the same key. If we load the state page after we finish 
the data, the peak woudl be less too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to