This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d760383817d Changes to reduce memory pinned while iterating through 
state backed iterable: (#32961)
d760383817d is described below

commit d760383817dce7d5d0d6ec5b4a500d948b041204
Author: Sam Whittle <[email protected]>
AuthorDate: Mon Nov 11 22:03:07 2024 +0100

    Changes to reduce memory pinned while iterating through state backed 
iterable: (#32961)
    
    - remove reference to completed encoded input page from decoder once we 
have read it.
    - re-read from cache after loading the next page to give eviction a chance 
to remove blocks
---
 .../org/apache/beam/sdk/fn/stream/DataStreams.java |  6 ++
 .../fn/harness/state/StateFetchingIterators.java   | 83 ++++++++++++----------
 2 files changed, 52 insertions(+), 37 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
index b0d29e2295a..2c6b61e6212 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
@@ -202,6 +202,12 @@ public class DataStreams {
           T next = next();
           rvals.add(next);
         }
+        // We don't support seeking backwards so release the memory of the last
+        // page if it is completed.
+        if (inbound.currentStream.available() == 0) {
+          inbound.position = 0;
+          inbound.currentStream = EMPTY_STREAM;
+        }
 
         // Uses the size of the ByteString as an approximation for the heap 
size occupied by the
         // page, considering an overhead of {@link 
BYTES_LIST_ELEMENT_OVERHEAD} for each element.
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
index 3b9fccfa2a5..81a2aa6d1cc 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
@@ -105,7 +105,7 @@ public class StateFetchingIterators {
       // many different state subcaches.
       return 0;
     }
-  };
+  }
 
   /** A mutable iterable that supports prefetch and is backed by a cache. */
   static class CachingStateIterable<T> extends 
PrefetchableIterables.Default<T> {
@@ -138,8 +138,8 @@ public class StateFetchingIterators {
     private static <T> long sumWeight(List<Block<T>> blocks) {
       try {
         long sum = 0;
-        for (int i = 0; i < blocks.size(); ++i) {
-          sum = Math.addExact(sum, blocks.get(i).getWeight());
+        for (Block<T> block : blocks) {
+          sum = Math.addExact(sum, block.getWeight());
         }
         return sum;
       } catch (ArithmeticException e) {
@@ -437,50 +437,59 @@ public class StateFetchingIterators {
           if (currentBlock.getValues().size() > currentCachedBlockValueIndex) {
             return true;
           }
-          if (currentBlock.getNextToken() == null) {
+          final ByteString nextToken = currentBlock.getNextToken();
+          if (nextToken == null) {
             return false;
           }
-          Blocks<T> existing = cache.peek(IterableCacheKey.INSTANCE);
-          boolean isFirstBlock = 
ByteString.EMPTY.equals(currentBlock.getNextToken());
+          // Release the block while we are loading the next one.
+          currentBlock =
+              Block.fromValues(new WeightedList<>(Collections.emptyList(), 
0L), ByteString.EMPTY);
+
+          @Nullable Blocks<T> existing = cache.peek(IterableCacheKey.INSTANCE);
+          boolean isFirstBlock = ByteString.EMPTY.equals(nextToken);
           if (existing == null) {
-            currentBlock = loadNextBlock(currentBlock.getNextToken());
+            currentBlock = loadNextBlock(nextToken);
             if (isFirstBlock) {
               cache.put(
                   IterableCacheKey.INSTANCE,
                   new BlocksPrefix<>(Collections.singletonList(currentBlock)));
             }
+          } else if (isFirstBlock) {
+            currentBlock = existing.getBlocks().get(0);
           } else {
-            if (isFirstBlock) {
-              currentBlock = existing.getBlocks().get(0);
-            } else {
-              checkState(
-                  existing instanceof BlocksPrefix,
-                  "Unexpected blocks type %s, expected a %s.",
-                  existing.getClass(),
-                  BlocksPrefix.class);
-              List<Block<T>> blocks = existing.getBlocks();
-              int currentBlockIndex = 0;
-              for (; currentBlockIndex < blocks.size(); ++currentBlockIndex) {
-                if (currentBlock
-                    .getNextToken()
-                    .equals(blocks.get(currentBlockIndex).getNextToken())) {
-                  break;
-                }
+            checkState(
+                existing instanceof BlocksPrefix,
+                "Unexpected blocks type %s, expected a %s.",
+                existing.getClass(),
+                BlocksPrefix.class);
+            List<Block<T>> blocks = existing.getBlocks();
+            int currentBlockIndex = 0;
+            for (; currentBlockIndex < blocks.size(); ++currentBlockIndex) {
+              if 
(nextToken.equals(blocks.get(currentBlockIndex).getNextToken())) {
+                break;
               }
-              // Load the next block from cache if it was found.
-              if (currentBlockIndex + 1 < blocks.size()) {
-                currentBlock = blocks.get(currentBlockIndex + 1);
-              } else {
-                // Otherwise load the block from state API.
-                currentBlock = loadNextBlock(currentBlock.getNextToken());
-
-                // Append this block to the existing set of blocks if it is 
logically the next one.
-                if (currentBlockIndex == blocks.size() - 1) {
-                  List<Block<T>> newBlocks = new ArrayList<>(currentBlockIndex 
+ 1);
-                  newBlocks.addAll(blocks);
-                  newBlocks.add(currentBlock);
-                  cache.put(IterableCacheKey.INSTANCE, new 
BlocksPrefix<>(newBlocks));
-                }
+            }
+            // Take the next block from the cache if it was found.
+            if (currentBlockIndex + 1 < blocks.size()) {
+              currentBlock = blocks.get(currentBlockIndex + 1);
+            } else {
+              // Otherwise load the block from state API.
+              // Remove references on the cached values while we are loading 
the next block.
+              existing = null;
+              blocks = null;
+              currentBlock = loadNextBlock(nextToken);
+              existing = cache.peek(IterableCacheKey.INSTANCE);
+              // Append this block to the existing set of blocks if it is 
logically the next one
+              // according to the
+              // tokens.
+              if (existing != null
+                  && !existing.getBlocks().isEmpty()
+                  && nextToken.equals(
+                      existing.getBlocks().get(existing.getBlocks().size() - 
1).getNextToken())) {
+                List<Block<T>> newBlocks = new ArrayList<>(currentBlockIndex + 
1);
+                newBlocks.addAll(existing.getBlocks());
+                newBlocks.add(currentBlock);
+                cache.put(IterableCacheKey.INSTANCE, new 
BlocksPrefix<>(newBlocks));
               }
             }
           }

Reply via email to