This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c3b3fa62c3a Include byte size of stateKey in estimated weight of
WindmillBag, WindmillValue, and WindmillWatermarkHold (#30654)
c3b3fa62c3a is described below
commit c3b3fa62c3a323e8da15a18aeba4a43b628efd24
Author: dmitryor <[email protected]>
AuthorDate: Mon Mar 18 11:43:27 2024 -0700
Include byte size of stateKey in estimated weight of WindmillBag,
WindmillValue, and WindmillWatermarkHold (#30654)
* Update WindmillBag.java
Include byte size of the stateKey on the BagState weight used to estimate
and limit the total state cache size
* Update WindmillValue.java
Include stateKey size in the byte size of a WidnmillValue
* Update WindmillWatermarkHold.java
Include keyState size in the WatermarkHold estimated byte size
* Fix formatting issue
* Fix expected cache item weights in WindmillStateInternalsTest
---
.../dataflow/worker/windmill/state/WindmillBag.java | 2 +-
.../dataflow/worker/windmill/state/WindmillValue.java | 2 +-
.../worker/windmill/state/WindmillWatermarkHold.java | 3 ++-
.../windmill/state/WindmillStateInternalsTest.java | 18 +++++++++---------
4 files changed, 13 insertions(+), 12 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
index b3719cc666d..702be70f411 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
@@ -193,7 +193,7 @@ public class WindmillBag<T> extends SimpleWindmillState
implements BagState<T> {
}
// We now know the complete bag contents, and any read on it will yield a
// cached value, so cache it for future reads.
- cache.put(namespace, address, this, encodedSize);
+ cache.put(namespace, address, this, encodedSize + stateKey.size());
}
// Don't reuse the localAdditions object; we don't want future changes to
it to
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java
index 923d166c823..bc3e0906f99 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java
@@ -124,7 +124,7 @@ public class WindmillValue<T> extends SimpleWindmillState
implements ValueState<
coder.encode(value, stream, Coder.Context.OUTER);
}
encoded = stream.toByteString();
- cachedSize = encoded.size();
+ cachedSize = (long) encoded.size() + stateKey.size();
}
// Place in cache to avoid a future read.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
index 9d939c759d2..e8b2290c3c7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
@@ -175,13 +175,14 @@ public class WindmillWatermarkHold extends WindmillState
implements WatermarkHol
throw new IllegalStateException("Unreachable condition");
}
+ final int estimatedByteSize = ENCODED_SIZE + stateKey.size();
return Futures.lazyTransform(
result,
result1 -> {
cleared = false;
localAdditions = null;
if (cachedValue != null) {
- cache.put(namespace, address, WindmillWatermarkHold.this,
ENCODED_SIZE);
+ cache.put(namespace, address, WindmillWatermarkHold.this,
estimatedByteSize);
}
return result1;
});
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index d55a20e5517..d53b1d8c3e8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -3043,7 +3043,7 @@ public class WindmillStateInternalsTest {
value.write("Hi");
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(132, cache.getWeight());
+ assertEquals(141, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, addr);
@@ -3051,7 +3051,7 @@ public class WindmillStateInternalsTest {
value.clear();
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(130, cache.getWeight());
+ assertEquals(139, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, addr);
@@ -3083,7 +3083,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(140, cache.getWeight());
+ assertEquals(147, cache.getWeight());
resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
@@ -3103,7 +3103,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(133, cache.getWeight());
+ assertEquals(140, cache.getWeight());
resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
@@ -3114,7 +3114,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(134, cache.getWeight());
+ assertEquals(141, cache.getWeight());
resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
@@ -3145,7 +3145,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(138, cache.getWeight());
+ assertEquals(151, cache.getWeight());
resetUnderTest();
hold = underTest.state(NAMESPACE, addr);
@@ -3154,7 +3154,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(138, cache.getWeight());
+ assertEquals(151, cache.getWeight());
resetUnderTest();
hold = underTest.state(NAMESPACE, addr);
@@ -3185,7 +3185,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(131, cache.getWeight());
+ assertEquals(144, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, COMBINING_ADDR);
@@ -3196,7 +3196,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(130, cache.getWeight());
+ assertEquals(143, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, COMBINING_ADDR);