This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d41b50bae94 [Dataflow Streaming] Replace HashBasedTable with HashMap
in CachingStateTable (#36743)
d41b50bae94 is described below
commit d41b50bae942d29f6ee06668a08866e132d7b58f
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Nov 7 02:36:31 2025 -0800
[Dataflow Streaming] Replace HashBasedTable with HashMap in
CachingStateTable (#36743)
CachingStateTable 1 not using the additional functionality provided by
HashBasedTable
and HashMap has lower (1 instead of 2) lookup overhead and reduce
allocations.
---
.../worker/windmill/state/CachingStateTable.java | 69 ++++++++++++++++++----
.../dataflow/worker/windmill/state/IdTracker.java | 3 +-
.../worker/windmill/state/WindmillOrderedList.java | 3 +-
.../windmill/state/WindmillStateInternals.java | 23 +++-----
4 files changed, 66 insertions(+), 32 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java
index f0ed566d237..3ea1fa87626 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java
@@ -17,11 +17,12 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.state;
+import com.google.auto.value.AutoValue;
import java.io.Closeable;
+import java.util.HashMap;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTable;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
@@ -29,26 +30,28 @@ import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.*;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
-final class CachingStateTable extends StateTable {
+final class CachingStateTable {
+ private final HashMap<StateTableKey, WindmillState> stateTable;
private final String stateFamily;
private final WindmillStateReader reader;
private final WindmillStateCache.ForKeyAndFamily cache;
private final boolean isSystemTable;
private final Supplier<Closeable> scopedReadStateSupplier;
- private final @Nullable StateTable derivedStateTable;
+ private final @Nullable CachingStateTable derivedStateTable;
private final boolean isNewKey;
private final boolean mapStateViaMultimapState;
private final WindmillStateTagUtil windmillStateTagUtil;
private CachingStateTable(Builder builder) {
+ this.stateTable = new HashMap<>();
this.stateFamily = builder.stateFamily;
this.reader = builder.reader;
this.cache = builder.cache;
@@ -65,20 +68,45 @@ final class CachingStateTable extends StateTable {
}
}
- static CachingStateTable.Builder builder(
+ static Builder builder(
String stateFamily,
WindmillStateReader reader,
ForKeyAndFamily cache,
boolean isNewKey,
Supplier<Closeable> scopedReadStateSupplier,
WindmillStateTagUtil windmillStateTagUtil) {
- return new CachingStateTable.Builder(
+ return new Builder(
stateFamily, reader, cache, scopedReadStateSupplier, isNewKey,
windmillStateTagUtil);
}
- @Override
+ /**
+ * Gets the {@link State} in the specified {@link StateNamespace} with the
specified {@link
+ * StateTag}, binding it using the {@link #binderForNamespace} if it is not
already present in
+ * this {@link CachingStateTable}.
+ */
+ public <StateT extends State> StateT get(
+ StateNamespace namespace, StateTag<StateT> tag, StateContext<?> c) {
+
+ StateTableKey stateTableKey = StateTableKey.create(namespace, tag);
+ @SuppressWarnings("unchecked")
+ StateT storage =
+ (StateT)
+ stateTable.computeIfAbsent(
+ stateTableKey,
+ unusedKey -> (WindmillState)
tag.bind(binderForNamespace(namespace, c)));
+ return storage;
+ }
+
+ public void clear() {
+ stateTable.clear();
+ }
+
+ public Iterable<WindmillState> values() {
+ return stateTable.values();
+ }
+
@SuppressWarnings("deprecation")
- protected StateTag.StateBinder binderForNamespace(StateNamespace namespace,
StateContext<?> c) {
+ private StateTag.StateBinder binderForNamespace(StateNamespace namespace,
StateContext<?> c) {
// Look up state objects in the cache or create new ones if not found.
The state will
// be added to the cache in persist().
return new StateTag.StateBinder() {
@@ -190,7 +218,7 @@ final class CachingStateTable extends StateTable {
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ CombineFn<InputT, AccumT, OutputT> combineFn) {
StateTag<CombiningState<InputT, AccumT, OutputT>> addressOrInternalTag
=
addressOrInternalTag(address);
@@ -214,7 +242,7 @@ final class CachingStateTable extends StateTable {
CombiningState<InputT, AccumT, OutputT>
bindCombiningValueWithContext(
StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
- CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>
combineFn) {
+ CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
return bindCombiningValue(
addressOrInternalTag(address), accumCoder,
CombineFnUtil.bindContext(combineFn, c));
}
@@ -239,6 +267,21 @@ final class CachingStateTable extends StateTable {
};
}
+ @AutoValue
+ abstract static class StateTableKey {
+
+ public abstract StateNamespace getStateNamespace();
+
+ public abstract String getId();
+
+ public static StateTableKey create(StateNamespace namespace, StateTag<?>
stateTag) {
+ // TODO(https://github.com/apache/beam/issues/36753): stateTag.getId()
returns only the
+ // string tag without system/user prefix. This could cause a collision
between system and
+ // user tag with the same id. Consider adding the prefix to state table
key.
+ return new AutoValue_CachingStateTable_StateTableKey(namespace,
stateTag.getId());
+ }
+ }
+
static class Builder {
private final String stateFamily;
@@ -248,7 +291,7 @@ final class CachingStateTable extends StateTable {
private final boolean isNewKey;
private final WindmillStateTagUtil windmillStateTagUtil;
private boolean isSystemTable;
- private @Nullable StateTable derivedStateTable;
+ private @Nullable CachingStateTable derivedStateTable;
private boolean mapStateViaMultimapState = false;
private Builder(
@@ -268,7 +311,7 @@ final class CachingStateTable extends StateTable {
this.windmillStateTagUtil = windmillStateTagUtil;
}
- Builder withDerivedState(StateTable derivedStateTable) {
+ Builder withDerivedState(CachingStateTable derivedStateTable) {
this.isSystemTable = false;
this.derivedStateTable = derivedStateTable;
return this;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/IdTracker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/IdTracker.java
index 5090626ae8e..bbcf108b317 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/IdTracker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/IdTracker.java
@@ -24,7 +24,6 @@ import java.util.SortedSet;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTable;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.sdk.coders.InstantCoder;
@@ -95,7 +94,7 @@ final class IdTracker {
// here.
private final ValueState<Map<Range<Instant>, RangeSet<Instant>>>
subRangeDeletionsValue;
- IdTracker(StateTable stateTable, StateNamespace namespace, StateTag<?> spec)
{
+ IdTracker(CachingStateTable stateTable, StateNamespace namespace,
StateTag<?> spec) {
StateTag<ValueState<Map<Range<Instant>, RangeSet<Long>>>> idsAvailableTag =
StateTags.makeSystemTagInternal(
StateTags.value(spec.getId() + IDS_AVAILABLE_STR,
IDS_AVAILABLE_CODER));
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillOrderedList.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillOrderedList.java
index 6bfef989e7f..03652471a04 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillOrderedList.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillOrderedList.java
@@ -25,7 +25,6 @@ import java.util.SortedSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTable;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
@@ -68,7 +67,7 @@ public class WindmillOrderedList<T> extends
SimpleWindmillState implements Order
private boolean cleared = false;
WindmillOrderedList(
- StateTable derivedStateTable,
+ CachingStateTable derivedStateTable,
StateNamespace namespace,
InternedByteString encodeKey,
StateTag<OrderedListState<T>> spec,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java
index 338141f7bd3..ecf64c1fc84 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTable;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
@@ -52,8 +51,8 @@ public class WindmillStateInternals<K> implements
StateInternals {
private final @Nullable K key;
private final WindmillStateCache.ForKeyAndFamily cache;
- private final StateTable workItemState;
- private final StateTable workItemDerivedState;
+ private final CachingStateTable workItemState;
+ private final CachingStateTable workItemDerivedState;
private final Supplier<Closeable> scopedReadStateSupplier;
public WindmillStateInternals(
@@ -82,17 +81,11 @@ public class WindmillStateInternals<K> implements
StateInternals {
return key;
}
- private void persist(List<Future<WorkItemCommitRequest>> commitsToMerge,
StateTable stateTable) {
- for (State location : stateTable.values()) {
- if (!(location instanceof WindmillState)) {
- throw new IllegalStateException(
- String.format(
- "%s wasn't created by %s -- unable to persist it",
- location.getClass().getSimpleName(),
getClass().getSimpleName()));
- }
-
+ private void persist(
+ List<Future<WorkItemCommitRequest>> commitsToMerge, CachingStateTable
stateTable) {
+ for (WindmillState location : stateTable.values()) {
try {
- commitsToMerge.add(((WindmillState) location).persist(cache));
+ commitsToMerge.add(location.persist(cache));
} catch (IOException e) {
throw new RuntimeException("Unable to persist state", e);
}
@@ -102,8 +95,8 @@ public class WindmillStateInternals<K> implements
StateInternals {
// Clear any references to the underlying reader to prevent space leaks.
// The next work unit to use these cached State objects will reset the
// reader to a current reader in case those values are modified.
- for (State location : stateTable.values()) {
- ((WindmillState) location).cleanupAfterWorkItem();
+ for (WindmillState location : stateTable.values()) {
+ location.cleanupAfterWorkItem();
}
// Clear out the map of already retrieved state instances.