This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d760383817d Changes to reduce memory pinned while iterating through
state backed iterable: (#32961)
d760383817d is described below
commit d760383817dce7d5d0d6ec5b4a500d948b041204
Author: Sam Whittle <[email protected]>
AuthorDate: Mon Nov 11 22:03:07 2024 +0100
Changes to reduce memory pinned while iterating through state backed
iterable: (#32961)
- remove reference to completed encoded input page from decoder once we
have read it.
- re-read from cache after loading the next page to give eviction a chance
to remove blocks
---
.../org/apache/beam/sdk/fn/stream/DataStreams.java | 6 ++
.../fn/harness/state/StateFetchingIterators.java | 83 ++++++++++++----------
2 files changed, 52 insertions(+), 37 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
index b0d29e2295a..2c6b61e6212 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
@@ -202,6 +202,12 @@ public class DataStreams {
T next = next();
rvals.add(next);
}
+ // We don't support seeking backwards so release the memory of the last
+ // page if it is completed.
+ if (inbound.currentStream.available() == 0) {
+ inbound.position = 0;
+ inbound.currentStream = EMPTY_STREAM;
+ }
// Uses the size of the ByteString as an approximation for the heap
size occupied by the
// page, considering an overhead of {@link
BYTES_LIST_ELEMENT_OVERHEAD} for each element.
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
index 3b9fccfa2a5..81a2aa6d1cc 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
@@ -105,7 +105,7 @@ public class StateFetchingIterators {
// many different state subcaches.
return 0;
}
- };
+ }
/** A mutable iterable that supports prefetch and is backed by a cache. */
static class CachingStateIterable<T> extends
PrefetchableIterables.Default<T> {
@@ -138,8 +138,8 @@ public class StateFetchingIterators {
private static <T> long sumWeight(List<Block<T>> blocks) {
try {
long sum = 0;
- for (int i = 0; i < blocks.size(); ++i) {
- sum = Math.addExact(sum, blocks.get(i).getWeight());
+ for (Block<T> block : blocks) {
+ sum = Math.addExact(sum, block.getWeight());
}
return sum;
} catch (ArithmeticException e) {
@@ -437,50 +437,59 @@ public class StateFetchingIterators {
if (currentBlock.getValues().size() > currentCachedBlockValueIndex) {
return true;
}
- if (currentBlock.getNextToken() == null) {
+ final ByteString nextToken = currentBlock.getNextToken();
+ if (nextToken == null) {
return false;
}
- Blocks<T> existing = cache.peek(IterableCacheKey.INSTANCE);
- boolean isFirstBlock =
ByteString.EMPTY.equals(currentBlock.getNextToken());
+ // Release the block while we are loading the next one.
+ currentBlock =
+ Block.fromValues(new WeightedList<>(Collections.emptyList(),
0L), ByteString.EMPTY);
+
+ @Nullable Blocks<T> existing = cache.peek(IterableCacheKey.INSTANCE);
+ boolean isFirstBlock = ByteString.EMPTY.equals(nextToken);
if (existing == null) {
- currentBlock = loadNextBlock(currentBlock.getNextToken());
+ currentBlock = loadNextBlock(nextToken);
if (isFirstBlock) {
cache.put(
IterableCacheKey.INSTANCE,
new BlocksPrefix<>(Collections.singletonList(currentBlock)));
}
+ } else if (isFirstBlock) {
+ currentBlock = existing.getBlocks().get(0);
} else {
- if (isFirstBlock) {
- currentBlock = existing.getBlocks().get(0);
- } else {
- checkState(
- existing instanceof BlocksPrefix,
- "Unexpected blocks type %s, expected a %s.",
- existing.getClass(),
- BlocksPrefix.class);
- List<Block<T>> blocks = existing.getBlocks();
- int currentBlockIndex = 0;
- for (; currentBlockIndex < blocks.size(); ++currentBlockIndex) {
- if (currentBlock
- .getNextToken()
- .equals(blocks.get(currentBlockIndex).getNextToken())) {
- break;
- }
+ checkState(
+ existing instanceof BlocksPrefix,
+ "Unexpected blocks type %s, expected a %s.",
+ existing.getClass(),
+ BlocksPrefix.class);
+ List<Block<T>> blocks = existing.getBlocks();
+ int currentBlockIndex = 0;
+ for (; currentBlockIndex < blocks.size(); ++currentBlockIndex) {
+ if
(nextToken.equals(blocks.get(currentBlockIndex).getNextToken())) {
+ break;
}
- // Load the next block from cache if it was found.
- if (currentBlockIndex + 1 < blocks.size()) {
- currentBlock = blocks.get(currentBlockIndex + 1);
- } else {
- // Otherwise load the block from state API.
- currentBlock = loadNextBlock(currentBlock.getNextToken());
-
- // Append this block to the existing set of blocks if it is
logically the next one.
- if (currentBlockIndex == blocks.size() - 1) {
- List<Block<T>> newBlocks = new ArrayList<>(currentBlockIndex
+ 1);
- newBlocks.addAll(blocks);
- newBlocks.add(currentBlock);
- cache.put(IterableCacheKey.INSTANCE, new
BlocksPrefix<>(newBlocks));
- }
+ }
+ // Take the next block from the cache if it was found.
+ if (currentBlockIndex + 1 < blocks.size()) {
+ currentBlock = blocks.get(currentBlockIndex + 1);
+ } else {
+ // Otherwise load the block from state API.
+ // Remove references on the cached values while we are loading
the next block.
+ existing = null;
+ blocks = null;
+ currentBlock = loadNextBlock(nextToken);
+ existing = cache.peek(IterableCacheKey.INSTANCE);
+ // Append this block to the existing set of blocks if it is
logically the next one
+ // according to the
+ // tokens.
+ if (existing != null
+ && !existing.getBlocks().isEmpty()
+ && nextToken.equals(
+ existing.getBlocks().get(existing.getBlocks().size() -
1).getNextToken())) {
+ List<Block<T>> newBlocks = new ArrayList<>(currentBlockIndex +
1);
+ newBlocks.addAll(existing.getBlocks());
+ newBlocks.add(currentBlock);
+ cache.put(IterableCacheKey.INSTANCE, new
BlocksPrefix<>(newBlocks));
}
}
}