This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d41b50bae94 [Dataflow Streaming] Replace HashBasedTable with HashMap 
in CachingStateTable (#36743)
d41b50bae94 is described below

commit d41b50bae942d29f6ee06668a08866e132d7b58f
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Nov 7 02:36:31 2025 -0800

    [Dataflow Streaming] Replace HashBasedTable with HashMap in 
CachingStateTable (#36743)
    
    CachingStateTable 1 not using the additional functionality provided by 
HashBasedTable
    and HashMap has lower (1 instead of 2) lookup overhead and reduce 
allocations.
---
 .../worker/windmill/state/CachingStateTable.java   | 69 ++++++++++++++++++----
 .../dataflow/worker/windmill/state/IdTracker.java  |  3 +-
 .../worker/windmill/state/WindmillOrderedList.java |  3 +-
 .../windmill/state/WindmillStateInternals.java     | 23 +++-----
 4 files changed, 66 insertions(+), 32 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java
index f0ed566d237..3ea1fa87626 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.state;
 
+import com.google.auto.value.AutoValue;
 import java.io.Closeable;
+import java.util.HashMap;
 import java.util.Optional;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTable;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
@@ -29,26 +30,28 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache
 import org.apache.beam.sdk.coders.BooleanCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.*;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
 
-final class CachingStateTable extends StateTable {
+final class CachingStateTable {
 
+  private final HashMap<StateTableKey, WindmillState> stateTable;
   private final String stateFamily;
   private final WindmillStateReader reader;
   private final WindmillStateCache.ForKeyAndFamily cache;
   private final boolean isSystemTable;
   private final Supplier<Closeable> scopedReadStateSupplier;
-  private final @Nullable StateTable derivedStateTable;
+  private final @Nullable CachingStateTable derivedStateTable;
   private final boolean isNewKey;
   private final boolean mapStateViaMultimapState;
   private final WindmillStateTagUtil windmillStateTagUtil;
 
   private CachingStateTable(Builder builder) {
+    this.stateTable = new HashMap<>();
     this.stateFamily = builder.stateFamily;
     this.reader = builder.reader;
     this.cache = builder.cache;
@@ -65,20 +68,45 @@ final class CachingStateTable extends StateTable {
     }
   }
 
-  static CachingStateTable.Builder builder(
+  static Builder builder(
       String stateFamily,
       WindmillStateReader reader,
       ForKeyAndFamily cache,
       boolean isNewKey,
       Supplier<Closeable> scopedReadStateSupplier,
       WindmillStateTagUtil windmillStateTagUtil) {
-    return new CachingStateTable.Builder(
+    return new Builder(
         stateFamily, reader, cache, scopedReadStateSupplier, isNewKey, 
windmillStateTagUtil);
   }
 
-  @Override
+  /**
+   * Gets the {@link State} in the specified {@link StateNamespace} with the 
specified {@link
+   * StateTag}, binding it using the {@link #binderForNamespace} if it is not 
already present in
+   * this {@link CachingStateTable}.
+   */
+  public <StateT extends State> StateT get(
+      StateNamespace namespace, StateTag<StateT> tag, StateContext<?> c) {
+
+    StateTableKey stateTableKey = StateTableKey.create(namespace, tag);
+    @SuppressWarnings("unchecked")
+    StateT storage =
+        (StateT)
+            stateTable.computeIfAbsent(
+                stateTableKey,
+                unusedKey -> (WindmillState) 
tag.bind(binderForNamespace(namespace, c)));
+    return storage;
+  }
+
+  public void clear() {
+    stateTable.clear();
+  }
+
+  public Iterable<WindmillState> values() {
+    return stateTable.values();
+  }
+
   @SuppressWarnings("deprecation")
-  protected StateTag.StateBinder binderForNamespace(StateNamespace namespace, 
StateContext<?> c) {
+  private StateTag.StateBinder binderForNamespace(StateNamespace namespace, 
StateContext<?> c) {
     // Look up state objects in the cache or create new ones if not found.  
The state will
     // be added to the cache in persist().
     return new StateTag.StateBinder() {
@@ -190,7 +218,7 @@ final class CachingStateTable extends StateTable {
       public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> 
bindCombiningValue(
           StateTag<CombiningState<InputT, AccumT, OutputT>> address,
           Coder<AccumT> accumCoder,
-          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+          CombineFn<InputT, AccumT, OutputT> combineFn) {
         StateTag<CombiningState<InputT, AccumT, OutputT>> addressOrInternalTag 
=
             addressOrInternalTag(address);
 
@@ -214,7 +242,7 @@ final class CachingStateTable extends StateTable {
           CombiningState<InputT, AccumT, OutputT> 
bindCombiningValueWithContext(
               StateTag<CombiningState<InputT, AccumT, OutputT>> address,
               Coder<AccumT> accumCoder,
-              CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> 
combineFn) {
+              CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
         return bindCombiningValue(
             addressOrInternalTag(address), accumCoder, 
CombineFnUtil.bindContext(combineFn, c));
       }
@@ -239,6 +267,21 @@ final class CachingStateTable extends StateTable {
     };
   }
 
+  @AutoValue
+  abstract static class StateTableKey {
+
+    public abstract StateNamespace getStateNamespace();
+
+    public abstract String getId();
+
+    public static StateTableKey create(StateNamespace namespace, StateTag<?> 
stateTag) {
+      // TODO(https://github.com/apache/beam/issues/36753): stateTag.getId() 
returns only the
+      // string tag without system/user prefix. This could cause a collision 
between system and
+      // user tag with the same id. Consider adding the prefix to state table 
key.
+      return new AutoValue_CachingStateTable_StateTableKey(namespace, 
stateTag.getId());
+    }
+  }
+
   static class Builder {
 
     private final String stateFamily;
@@ -248,7 +291,7 @@ final class CachingStateTable extends StateTable {
     private final boolean isNewKey;
     private final WindmillStateTagUtil windmillStateTagUtil;
     private boolean isSystemTable;
-    private @Nullable StateTable derivedStateTable;
+    private @Nullable CachingStateTable derivedStateTable;
     private boolean mapStateViaMultimapState = false;
 
     private Builder(
@@ -268,7 +311,7 @@ final class CachingStateTable extends StateTable {
       this.windmillStateTagUtil = windmillStateTagUtil;
     }
 
-    Builder withDerivedState(StateTable derivedStateTable) {
+    Builder withDerivedState(CachingStateTable derivedStateTable) {
       this.isSystemTable = false;
       this.derivedStateTable = derivedStateTable;
       return this;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/IdTracker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/IdTracker.java
index 5090626ae8e..bbcf108b317 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/IdTracker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/IdTracker.java
@@ -24,7 +24,6 @@ import java.util.SortedSet;
 import java.util.concurrent.ExecutionException;
 import java.util.function.BiConsumer;
 import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTable;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.sdk.coders.InstantCoder;
@@ -95,7 +94,7 @@ final class IdTracker {
   // here.
   private final ValueState<Map<Range<Instant>, RangeSet<Instant>>> 
subRangeDeletionsValue;
 
-  IdTracker(StateTable stateTable, StateNamespace namespace, StateTag<?> spec) 
{
+  IdTracker(CachingStateTable stateTable, StateNamespace namespace, 
StateTag<?> spec) {
     StateTag<ValueState<Map<Range<Instant>, RangeSet<Long>>>> idsAvailableTag =
         StateTags.makeSystemTagInternal(
             StateTags.value(spec.getId() + IDS_AVAILABLE_STR, 
IDS_AVAILABLE_CODER));
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillOrderedList.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillOrderedList.java
index 6bfef989e7f..03652471a04 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillOrderedList.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillOrderedList.java
@@ -25,7 +25,6 @@ import java.util.SortedSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTable;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
@@ -68,7 +67,7 @@ public class WindmillOrderedList<T> extends 
SimpleWindmillState implements Order
   private boolean cleared = false;
 
   WindmillOrderedList(
-      StateTable derivedStateTable,
+      CachingStateTable derivedStateTable,
       StateNamespace namespace,
       InternedByteString encodeKey,
       StateTag<OrderedListState<T>> spec,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java
index 338141f7bd3..ecf64c1fc84 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTable;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
@@ -52,8 +51,8 @@ public class WindmillStateInternals<K> implements 
StateInternals {
   private final @Nullable K key;
 
   private final WindmillStateCache.ForKeyAndFamily cache;
-  private final StateTable workItemState;
-  private final StateTable workItemDerivedState;
+  private final CachingStateTable workItemState;
+  private final CachingStateTable workItemDerivedState;
   private final Supplier<Closeable> scopedReadStateSupplier;
 
   public WindmillStateInternals(
@@ -82,17 +81,11 @@ public class WindmillStateInternals<K> implements 
StateInternals {
     return key;
   }
 
-  private void persist(List<Future<WorkItemCommitRequest>> commitsToMerge, 
StateTable stateTable) {
-    for (State location : stateTable.values()) {
-      if (!(location instanceof WindmillState)) {
-        throw new IllegalStateException(
-            String.format(
-                "%s wasn't created by %s -- unable to persist it",
-                location.getClass().getSimpleName(), 
getClass().getSimpleName()));
-      }
-
+  private void persist(
+      List<Future<WorkItemCommitRequest>> commitsToMerge, CachingStateTable 
stateTable) {
+    for (WindmillState location : stateTable.values()) {
       try {
-        commitsToMerge.add(((WindmillState) location).persist(cache));
+        commitsToMerge.add(location.persist(cache));
       } catch (IOException e) {
         throw new RuntimeException("Unable to persist state", e);
       }
@@ -102,8 +95,8 @@ public class WindmillStateInternals<K> implements 
StateInternals {
     // Clear any references to the underlying reader to prevent space leaks.
     // The next work unit to use these cached State objects will reset the
     // reader to a current reader in case those values are modified.
-    for (State location : stateTable.values()) {
-      ((WindmillState) location).cleanupAfterWorkItem();
+    for (WindmillState location : stateTable.values()) {
+      location.cleanupAfterWorkItem();
     }
 
     // Clear out the map of already retrieved state instances.

Reply via email to