This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 89aeb187f2c Correct per-entry HashMap overhead in WindmillStateCache
(#30672)
89aeb187f2c is described below
commit 89aeb187f2c9350fa51fc2b2f690c93a57e523b9
Author: dmitryor <[email protected]>
AuthorDate: Wed Apr 10 01:28:23 2024 -0700
Correct per-entry HashMap overhead in WindmillStateCache (#30672)
---
.../worker/windmill/state/WindmillStateCache.java | 3 ++-
.../worker/windmill/state/WindmillStateCacheTest.java | 12 ++++++------
.../windmill/state/WindmillStateInternalsTest.java | 18 +++++++++---------
3 files changed, 17 insertions(+), 16 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
index 85c74fe8591..c6c49134bcb 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
@@ -64,7 +64,8 @@ public class WindmillStateCache implements StatusDataProvider
{
// Initial size of hash tables per entry.
private static final int INITIAL_HASH_MAP_CAPACITY = 4;
// Overhead of each hash map entry.
- private static final int HASH_MAP_ENTRY_OVERHEAD = 16;
+ // https://appsintheopen.com/posts/52-the-memory-overhead-of-java-ojects
+ private static final int HASH_MAP_ENTRY_OVERHEAD = 32;
// Overhead of each StateCacheEntry. One long, plus a hash table.
private static final int PER_CACHE_ENTRY_OVERHEAD =
8 + HASH_MAP_ENTRY_OVERHEAD * INITIAL_HASH_MAP_CAPACITY;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
index 1f4355b156b..446a34f73de 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
@@ -168,15 +168,15 @@ public class WindmillStateCacheTest {
assertEquals(0, cache.getWeight());
keyCache.persist();
- assertEquals(254, cache.getWeight());
+ assertEquals(414, cache.getWeight());
keyCache.put(triggerNamespace(0, 0), new TestStateTag("tag3"), new
TestState("t3"), 2);
keyCache.put(triggerNamespace(0, 0), new TestStateTag("tag2"), new
TestState("t2"), 2);
// Observes updated weight in entries, though cache will not know about it.
- assertEquals(290, cache.getWeight());
+ assertEquals(482, cache.getWeight());
keyCache.persist();
- assertEquals(290, cache.getWeight());
+ assertEquals(482, cache.getWeight());
keyCache =
cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L,
2L).forFamily(STATE_FAMILY);
@@ -212,7 +212,7 @@ public class WindmillStateCacheTest {
keyCache =
cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L,
2L).forFamily(STATE_FAMILY);
- assertEquals(127, cache.getWeight());
+ assertEquals(207, cache.getWeight());
assertEquals(
Optional.of(new TestState("g1")),
keyCache.get(StateNamespaces.global(), new TestStateTag("tag1")));
@@ -221,7 +221,7 @@ public class WindmillStateCacheTest {
cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 1L,
3L).forFamily(STATE_FAMILY);
assertEquals(
Optional.empty(), keyCache.get(StateNamespaces.global(), new
TestStateTag("tag1")));
- assertEquals(127, cache.getWeight());
+ assertEquals(207, cache.getWeight());
}
/** Verifies that the cache is invalidated when the cache token changes. */
@@ -254,7 +254,7 @@ public class WindmillStateCacheTest {
assertEquals(Optional.of(new TestState("w2")),
keyCache.get(windowNamespace(0), tag));
assertEquals(0, cache.getWeight());
keyCache.persist();
- assertEquals(127, cache.getWeight());
+ assertEquals(207, cache.getWeight());
assertEquals(Optional.of(new TestState("w2")),
keyCache.get(windowNamespace(0), tag));
// Previous work token.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index d53b1d8c3e8..a53240d6453 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -3043,7 +3043,7 @@ public class WindmillStateInternalsTest {
value.write("Hi");
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(141, cache.getWeight());
+ assertEquals(221, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, addr);
@@ -3051,7 +3051,7 @@ public class WindmillStateInternalsTest {
value.clear();
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(139, cache.getWeight());
+ assertEquals(219, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, addr);
@@ -3083,7 +3083,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(147, cache.getWeight());
+ assertEquals(227, cache.getWeight());
resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
@@ -3103,7 +3103,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(140, cache.getWeight());
+ assertEquals(220, cache.getWeight());
resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
@@ -3114,7 +3114,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(141, cache.getWeight());
+ assertEquals(221, cache.getWeight());
resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
@@ -3145,7 +3145,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(151, cache.getWeight());
+ assertEquals(231, cache.getWeight());
resetUnderTest();
hold = underTest.state(NAMESPACE, addr);
@@ -3154,7 +3154,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(151, cache.getWeight());
+ assertEquals(231, cache.getWeight());
resetUnderTest();
hold = underTest.state(NAMESPACE, addr);
@@ -3185,7 +3185,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(144, cache.getWeight());
+ assertEquals(224, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, COMBINING_ADDR);
@@ -3196,7 +3196,7 @@ public class WindmillStateInternalsTest {
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
- assertEquals(143, cache.getWeight());
+ assertEquals(223, cache.getWeight());
resetUnderTest();
value = underTest.state(NAMESPACE, COMBINING_ADDR);