[ 
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=770459&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-770459
 ]

ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/May/22 02:12
            Start Date: 14/May/22 02:12
    Worklog Time Spent: 10m 
      Work Description: youngoli commented on code in PR #17327:
URL: https://github.com/apache/beam/pull/17327#discussion_r872913431


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java:
##########
@@ -17,434 +17,392 @@
  */
 package org.apache.beam.fn.harness;
 
-import java.util.HashMap;
+import java.util.Collection;
 import java.util.Iterator;
-import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.Objects;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.fn.harness.Cache.Shrinkable;
 import org.apache.beam.runners.core.GlobalCombineFnRunner;
 import org.apache.beam.runners.core.GlobalCombineFnRunners;
 import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.SdkHarnessOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Weighted;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
 import org.joda.time.Instant;
 
-/** Static utility methods that provide {@link GroupingTable} implementations. 
*/
+/**
+ * Static utility methods that provide a grouping table implementation.
+ *
+ * <p>{@link NotThreadSafe} because the caller must use the bundle processing 
thread when invoking
+ * {@link #put} and {@link #flush}. {@link #shrink} may be called from any 
thread.
+ */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
+@NotThreadSafe
 public class PrecombineGroupingTable<K, InputT, AccumT>
-    implements GroupingTable<K, InputT, AccumT> {
-  private static long getGroupingTableSizeBytes(PipelineOptions options) {
-    return options.as(SdkHarnessOptions.class).getGroupingTableMaxSizeMb() * 
1024L * 1024L;
-  }
+    implements Shrinkable<PrecombineGroupingTable<K, InputT, AccumT>>, 
Weighted {
+
+  private static final Instant IGNORED = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
-  /** Returns a {@link GroupingTable} that combines inputs into a accumulator. 
*/
-  public static <K, InputT, AccumT> GroupingTable<WindowedValue<K>, InputT, 
AccumT> combining(
+  /**
+   * Returns a grouping table that combines inputs into an accumulator. The 
grouping table uses the
+   * cache to defer flushing output until the cache evicts the table.
+   */
+  public static <K, InputT, AccumT> PrecombineGroupingTable<K, InputT, AccumT> 
combining(
       PipelineOptions options,
+      Cache<Object, Object> cache,
       CombineFn<InputT, AccumT, ?> combineFn,
-      Coder<K> keyCoder,
-      Coder<? super AccumT> accumulatorCoder) {
-    Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
-        new ValueCombiner<>(
-            GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+      Coder<K> keyCoder) {
     return new PrecombineGroupingTable<>(
-        getGroupingTableSizeBytes(options),
-        new WindowingCoderGroupingKeyCreator<>(keyCoder),
-        WindowedPairInfo.create(),
-        valueCombiner,
-        new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
-        new CoderSizeEstimator<>(accumulatorCoder));
+        options,
+        cache,
+        keyCoder,
+        GlobalCombineFnRunners.create(combineFn),
+        Caches::weigh,
+        Caches::weigh);
   }
 
   /**
-   * Returns a {@link GroupingTable} that combines inputs into a accumulator 
with sampling {@link
-   * SizeEstimator SizeEstimators}.
+   * Returns a grouping table that combines inputs into an accumulator with 
sampling {@link
+   * SizeEstimator SizeEstimators}. The grouping table uses the cache to defer 
flushing output until
+   * the cache evicts the table.
    */
-  public static <K, InputT, AccumT>
-      GroupingTable<WindowedValue<K>, InputT, AccumT> combiningAndSampling(
-          PipelineOptions options,
-          CombineFn<InputT, AccumT, ?> combineFn,
-          Coder<K> keyCoder,
-          Coder<? super AccumT> accumulatorCoder,
-          double sizeEstimatorSampleRate) {
-    Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
-        new ValueCombiner<>(
-            GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+  public static <K, InputT, AccumT> PrecombineGroupingTable<K, InputT, AccumT> 
combiningAndSampling(
+      PipelineOptions options,
+      Cache<Object, Object> cache,
+      CombineFn<InputT, AccumT, ?> combineFn,
+      Coder<K> keyCoder,
+      double sizeEstimatorSampleRate) {
     return new PrecombineGroupingTable<>(
-        getGroupingTableSizeBytes(options),
-        new WindowingCoderGroupingKeyCreator<>(keyCoder),
-        WindowedPairInfo.create(),
-        valueCombiner,
-        new SamplingSizeEstimator<>(
-            new 
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
-            sizeEstimatorSampleRate,
-            1.0),
-        new SamplingSizeEstimator<>(
-            new CoderSizeEstimator<>(accumulatorCoder), 
sizeEstimatorSampleRate, 1.0));
+        options,
+        cache,
+        keyCoder,
+        GlobalCombineFnRunners.create(combineFn),
+        new SamplingSizeEstimator<>(Caches::weigh, sizeEstimatorSampleRate, 
1.0),
+        new SamplingSizeEstimator<>(Caches::weigh, sizeEstimatorSampleRate, 
1.0));
   }
 
-  /** Provides client-specific operations for grouping keys. */
-  public interface GroupingKeyCreator<K> {
-    Object createGroupingKey(K key) throws Exception;
+  @Nullable
+  @Override
+  public PrecombineGroupingTable<K, InputT, AccumT> shrink() {
+    long currentWeight = maxWeight.updateAndGet(operand -> operand >> 1);
+    // It is possible that we are shrunk multiple times until the requested 
max weight is too small.
+    // In this case we want to effectively stop shrinking since we can't 
effectively cache much
+    // at this time and the next insertion will likely evict all records.
+    if (currentWeight <= 100L) {
+      return null;
+    }
+    return this;
   }
 
-  /** Implements Precombine GroupingKeyCreator via Coder. */
-  public static class WindowingCoderGroupingKeyCreator<K>
-      implements GroupingKeyCreator<WindowedValue<K>> {
+  @Override
+  public long getWeight() {
+    return maxWeight.get();
+  }
 
-    private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  /** Provides client-specific operations for size estimates. */
+  @FunctionalInterface
+  public interface SizeEstimator<T> {
+    long estimateSize(T element);
+  }
 
-    private final Coder<K> coder;
+  private final Coder<K> keyCoder;
+  private final GlobalCombineFnRunner<InputT, AccumT, ?> combineFn;
+  private final PipelineOptions options;
+  private final SizeEstimator<K> keySizer;
+  private final SizeEstimator<AccumT> accumulatorSizer;
+  private final Cache<Key, PrecombineGroupingTable<K, InputT, AccumT>> cache;
+  private final LinkedHashMap<GroupingTableKey, GroupingTableEntry> lruMap;
+  private final AtomicLong maxWeight;
+  private long weight;
 
-    WindowingCoderGroupingKeyCreator(Coder<K> coder) {
-      this.coder = coder;
-    }
+  private static final class Key implements Weighted {
+    private static final Key INSTANCE = new Key();
 
     @Override
-    public Object createGroupingKey(WindowedValue<K> key) {
-      // Ignore timestamp for grouping purposes.
-      // The Precombine output will inherit the timestamp of one of its inputs.
-      return WindowedValue.of(
-          coder.structuralValue(key.getValue()), ignored, key.getWindows(), 
key.getPane());
+    public long getWeight() {
+      // Ignore the actual size of this singleton because it is trivial and 
because
+      // the weight reported here will be counted many times as it is present 
in
+      // many different state subcaches.
+      return 0;
     }
   }
 
-  /** Provides client-specific operations for size estimates. */
-  public interface SizeEstimator<T> {
-    long estimateSize(T element) throws Exception;
+  PrecombineGroupingTable(
+      PipelineOptions options,
+      Cache<?, ?> cache,
+      Coder<K> keyCoder,
+      GlobalCombineFnRunner<InputT, AccumT, ?> combineFn,
+      SizeEstimator<K> keySizer,
+      SizeEstimator<AccumT> accumulatorSizer) {
+    this.options = options;
+    this.cache = (Cache<Key, PrecombineGroupingTable<K, InputT, AccumT>>) 
cache;
+    this.keyCoder = keyCoder;
+    this.combineFn = combineFn;
+    this.keySizer = keySizer;
+    this.accumulatorSizer = accumulatorSizer;
+    this.lruMap = new LinkedHashMap<>(16, 0.75f, true);
+    this.maxWeight = new AtomicLong();
+    this.weight = 0L;
+    this.cache.put(Key.INSTANCE, this);
   }
 
-  /** Implements SizeEstimator via Coder. */
-  public static class CoderSizeEstimator<T> implements SizeEstimator<T> {
-    /** Basic implementation of {@link ElementByteSizeObserver} for use in 
size estimation. */
-    private static class Observer extends ElementByteSizeObserver {
-      private long observedSize = 0;
-
-      @Override
-      protected void reportElementSize(long elementSize) {
-        observedSize += elementSize;
+  private static class GroupingTableKey implements Weighted {
+    private final Object structuralKey;
+    private final Collection<? extends BoundedWindow> windows;
+    private final PaneInfo paneInfo;
+    private final long weight;
+
+    <K> GroupingTableKey(
+        K key,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo paneInfo,
+        Coder<K> keyCoder,
+        SizeEstimator<K> keySizer) {
+      this.structuralKey = keyCoder.structuralValue(key);
+      this.windows = windows;
+      this.paneInfo = paneInfo;
+      // We account for the weight of the key using the keySizer if the 
coder's structural value
+      // is the same as its value.
+      if (structuralKey == key) {
+        weight = keySizer.estimateSize(key) + Caches.weigh(windows) + 
Caches.weigh(paneInfo);
+      } else {
+        weight = Caches.weigh(this);
       }
     }
 
-    final Coder<T> coder;
-
-    CoderSizeEstimator(Coder<T> coder) {
-      this.coder = coder;
+    public Object getStructuralKey() {
+      return structuralKey;
     }
 
-    @Override
-    public long estimateSize(T value) throws Exception {
-      // First try using byte size observer
-      CoderSizeEstimator.Observer observer = new CoderSizeEstimator.Observer();
-      coder.registerByteSizeObserver(value, observer);
-
-      if (!observer.getIsLazy()) {
-        observer.advance();
-        return observer.observedSize;
-      } else {
-        // Coder byte size observation is lazy (requires iteration for 
observation) so fall back to
-        // counting output stream
-        CountingOutputStream os = new 
CountingOutputStream(ByteStreams.nullOutputStream());
-        coder.encode(value, os);
-        return os.getCount();
-      }
+    public Collection<? extends BoundedWindow> getWindows() {
+      return windows;
     }
-  }
 
-  /**
-   * Provides client-specific operations for working with elements that are 
key/value or key/values
-   * pairs.
-   */
-  public interface PairInfo {
-    Object getKeyFromInputPair(Object pair);
-
-    Object getValueFromInputPair(Object pair);
-
-    Object makeOutputPair(Object key, Object value);
-  }
-
-  /** Implements Precombine PairInfo via KVs. */
-  public static class WindowedPairInfo implements PairInfo {
-    private static WindowedPairInfo theInstance = new WindowedPairInfo();
-
-    public static WindowedPairInfo create() {
-      return theInstance;
+    public PaneInfo getPaneInfo() {
+      return paneInfo;
     }
 
-    private WindowedPairInfo() {}
+    @Override
+    public long getWeight() {
+      return weight;
+    }
 
     @Override
-    public Object getKeyFromInputPair(Object pair) {
-      @SuppressWarnings("unchecked")
-      WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
-      return windowedKv.withValue(windowedKv.getValue().getKey());
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof GroupingTableKey)) {
+        return false;
+      }
+      GroupingTableKey that = (GroupingTableKey) o;
+      return Objects.equals(structuralKey, that.structuralKey)
+          && windows.equals(that.windows)
+          && paneInfo.equals(that.paneInfo);
     }
 
     @Override
-    public Object getValueFromInputPair(Object pair) {
-      @SuppressWarnings("unchecked")
-      WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
-      return windowedKv.getValue().getValue();
+    public int hashCode() {
+      return Objects.hash(structuralKey, windows, paneInfo);
     }
 
     @Override
-    public Object makeOutputPair(Object key, Object values) {
-      WindowedValue<?> windowedKey = (WindowedValue<?>) key;
-      return windowedKey.withValue(KV.of(windowedKey.getValue(), values));
+    public String toString() {
+      return "GroupingTableKey{"
+          + "structuralKey="
+          + structuralKey
+          + ", windows="
+          + windows
+          + ", paneInfo="
+          + paneInfo
+          + ", weight="
+          + weight
+          + '}';
     }
   }
 
-  /** Provides client-specific operations for combining values. */
-  public interface Combiner<K, InputT, AccumT, OutputT> {
-    AccumT createAccumulator(K key);
-
-    AccumT add(K key, AccumT accumulator, InputT value);
-
-    AccumT merge(K key, Iterable<AccumT> accumulators);
-
-    AccumT compact(K key, AccumT accumulator);
+  private class GroupingTableEntry implements Weighted {
+    private final GroupingTableKey groupingKey;
+    private final K userKey;
+    private final long keySize;
+    private long accumulatorSize;
+    private AccumT accumulator;
+    private boolean dirty;
+
+    private GroupingTableEntry(GroupingTableKey groupingKey, K userKey, InputT 
initialInputValue) {
+      this.groupingKey = groupingKey;
+      this.userKey = userKey;
+      if (groupingKey.getStructuralKey() == userKey) {
+        // This object is only storing references to the same objects that are 
being stored
+        // by the cache so the accounting of the size of the key is occurring 
already.
+        this.keySize = Caches.REFERENCE_SIZE * 2;
+      } else {
+        this.keySize = Caches.REFERENCE_SIZE + keySizer.estimateSize(userKey);
+      }
+      this.accumulator =
+          combineFn.createAccumulator(
+              options, NullSideInputReader.empty(), groupingKey.getWindows());
+      add(initialInputValue);
+      this.accumulatorSize = accumulatorSizer.estimateSize(accumulator);
+    }
 
-    OutputT extract(K key, AccumT accumulator);
-  }
+    public GroupingTableKey getGroupingKey() {
+      return groupingKey;
+    }
 
-  /** Implements Precombine Combiner via Combine.KeyedCombineFn. */
-  public static class ValueCombiner<K, InputT, AccumT, OutputT>
-      implements Combiner<WindowedValue<K>, InputT, AccumT, OutputT> {
-    private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn;
-    private final SideInputReader sideInputReader;
-    private final PipelineOptions options;
-
-    private ValueCombiner(
-        GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn,
-        SideInputReader sideInputReader,
-        PipelineOptions options) {
-      this.combineFn = combineFn;
-      this.sideInputReader = sideInputReader;
-      this.options = options;
+    public K getKey() {
+      return userKey;
     }
 
-    @Override
-    public AccumT createAccumulator(WindowedValue<K> windowedKey) {
-      return this.combineFn.createAccumulator(options, sideInputReader, 
windowedKey.getWindows());
+    public AccumT getValue() {
+      return accumulator;
     }
 
     @Override
-    public AccumT add(WindowedValue<K> windowedKey, AccumT accumulator, InputT 
value) {
-      return this.combineFn.addInput(
-          accumulator, value, options, sideInputReader, 
windowedKey.getWindows());
+    public long getWeight() {
+      return keySize + accumulatorSize;
     }
 
-    @Override
-    public AccumT merge(WindowedValue<K> windowedKey, Iterable<AccumT> 
accumulators) {
-      return this.combineFn.mergeAccumulators(
-          accumulators, options, sideInputReader, windowedKey.getWindows());
+    public void compact() {
+      if (dirty) {
+        accumulator =
+            combineFn.compact(
+                accumulator, options, NullSideInputReader.empty(), 
groupingKey.getWindows());
+        accumulatorSize = accumulatorSizer.estimateSize(accumulator);
+        dirty = false;
+      }
     }
 
-    @Override
-    public AccumT compact(WindowedValue<K> windowedKey, AccumT accumulator) {
-      return this.combineFn.compact(
-          accumulator, options, sideInputReader, windowedKey.getWindows());
+    public void add(InputT value) {
+      dirty = true;
+      accumulator =
+          combineFn.addInput(
+              accumulator, value, options, NullSideInputReader.empty(), 
groupingKey.getWindows());
+      accumulatorSize = accumulatorSizer.estimateSize(accumulator);
     }
 
     @Override
-    public OutputT extract(WindowedValue<K> windowedKey, AccumT accumulator) {
-      return this.combineFn.extractOutput(
-          accumulator, options, sideInputReader, windowedKey.getWindows());
+    public String toString() {
+      return "GroupingTableEntry{"
+          + "groupingKey="
+          + groupingKey
+          + ", userKey="
+          + userKey
+          + ", keySize="
+          + keySize
+          + ", accumulatorSize="
+          + accumulatorSize
+          + ", accumulator="
+          + accumulator
+          + ", dirty="
+          + dirty
+          + '}';
     }
   }
 
-  // How many bytes a word in the JVM has.
-  private static final int BYTES_PER_JVM_WORD = getBytesPerJvmWord();
-  /**
-   * The number of bytes of overhead to store an entry in the grouping table 
(a {@code
-   * HashMap<StructuralByteArray, KeyAndValues>}), ignoring the actual number 
of bytes in the keys
-   * and values:
-   *
-   * <ul>
-   *   <li>an array element (1 word),
-   *   <li>a HashMap.Entry (4 words),
-   *   <li>a StructuralByteArray (1 words),
-   *   <li>a backing array (guessed at 1 word for the length),
-   *   <li>a KeyAndValues (2 words),
-   *   <li>an ArrayList (2 words),
-   *   <li>a backing array (1 word),
-   *   <li>per-object overhead (JVM-specific, guessed at 2 words * 6 objects).
-   * </ul>
-   */
-  private static final int PER_KEY_OVERHEAD = 24 * BYTES_PER_JVM_WORD;
-
-  /** A {@link GroupingTable} that uses the given combiner to combine values 
in place. */
-  // Keep the table relatively full to increase the chance of collisions.
-  private static final double TARGET_LOAD = 0.9;
-
-  private long maxSize;
-  private final GroupingKeyCreator<? super K> groupingKeyCreator;
-  private final PairInfo pairInfo;
-  private final Combiner<? super K, InputT, AccumT, ?> combiner;
-  private final SizeEstimator<? super K> keySizer;
-  private final SizeEstimator<? super AccumT> accumulatorSizer;
-
-  private long size = 0;
-  private Map<Object, GroupingTableEntry<K, InputT, AccumT>> table;
-
-  PrecombineGroupingTable(
-      long maxSize,
-      GroupingKeyCreator<? super K> groupingKeyCreator,
-      PairInfo pairInfo,
-      Combiner<? super K, InputT, AccumT, ?> combineFn,
-      SizeEstimator<? super K> keySizer,
-      SizeEstimator<? super AccumT> accumulatorSizer) {
-    this.maxSize = maxSize;
-    this.groupingKeyCreator = groupingKeyCreator;
-    this.pairInfo = pairInfo;
-    this.combiner = combineFn;
-    this.keySizer = keySizer;
-    this.accumulatorSizer = accumulatorSizer;
-    this.table = new HashMap<>();
-  }
-
-  interface GroupingTableEntry<K, InputT, AccumT> {
-    K getKey();
-
-    AccumT getValue();
-
-    void add(InputT value) throws Exception;
-
-    long getSize();
-
-    void compact() throws Exception;
-  }
-
-  private GroupingTableEntry<K, InputT, AccumT> createTableEntry(final K key) 
throws Exception {
-    return new GroupingTableEntry<K, InputT, AccumT>() {
-      final long keySize = keySizer.estimateSize(key);
-      AccumT accumulator = combiner.createAccumulator(key);
-      long accumulatorSize = 0; // never used before a value is added...
-
-      @Override
-      public K getKey() {
-        return key;
-      }
-
-      @Override
-      public AccumT getValue() {
-        return accumulator;
-      }
-
-      @Override
-      public long getSize() {
-        return keySize + accumulatorSize;
-      }
-
-      @Override
-      public void compact() throws Exception {
-        AccumT newAccumulator = combiner.compact(key, accumulator);
-        if (newAccumulator != accumulator) {
-          accumulator = newAccumulator;
-          accumulatorSize = accumulatorSizer.estimateSize(newAccumulator);
-        }
-      }
-
-      @Override
-      public void add(InputT value) throws Exception {
-        accumulator = combiner.add(key, accumulator, value);
-        accumulatorSize = accumulatorSizer.estimateSize(accumulator);
-      }
-    };
-  }
-
-  /** Adds a pair to this table, possibly flushing some entries to output if 
the table is full. */
-  @SuppressWarnings("unchecked")
-  @Override
-  public void put(Object pair, Receiver receiver) throws Exception {
-    put(
-        (K) pairInfo.getKeyFromInputPair(pair),
-        (InputT) pairInfo.getValueFromInputPair(pair),
-        receiver);
-  }
-
   /**
    * Adds the key and value to this table, possibly flushing some entries to 
output if the table is
    * full.
    */
-  public void put(K key, InputT value, Receiver receiver) throws Exception {
-    Object groupingKey = groupingKeyCreator.createGroupingKey(key);
-    GroupingTableEntry<K, InputT, AccumT> entry = table.get(groupingKey);
-    if (entry == null) {
-      entry = createTableEntry(key);
-      table.put(groupingKey, entry);
-      size += PER_KEY_OVERHEAD;
-    } else {
-      size -= entry.getSize();
-    }
-    entry.add(value);
-    size += entry.getSize();
-
-    if (size >= maxSize) {
-      long targetSize = (long) (TARGET_LOAD * maxSize);
-      Iterator<GroupingTableEntry<K, InputT, AccumT>> entries = 
table.values().iterator();
-      while (size >= targetSize) {
-        if (!entries.hasNext()) {
-          // Should never happen, but sizes may be estimates...
-          size = 0;
-          break;
+  @VisibleForTesting
+  public void put(
+      WindowedValue<KV<K, InputT>> value, FnDataReceiver<WindowedValue<KV<K, 
AccumT>>> receiver)
+      throws Exception {
+    // Ignore timestamp for grouping purposes.
+    // The Pre-combine output will inherit the timestamp of one of its inputs.
+    GroupingTableKey groupingKey =
+        new GroupingTableKey(
+            value.getValue().getKey(), value.getWindows(), value.getPane(), 
keyCoder, keySizer);
+
+    lruMap.compute(
+        groupingKey,
+        (key, tableEntry) -> {
+          if (tableEntry == null) {
+            weight += groupingKey.getWeight();
+            tableEntry =
+                new GroupingTableEntry(
+                    groupingKey, value.getValue().getKey(), 
value.getValue().getValue());
+          } else {
+            weight -= tableEntry.getWeight();
+            tableEntry.add(value.getValue().getValue());
+          }
+          weight += tableEntry.getWeight();
+          return tableEntry;
+        });
+
+    // Increase the maximum only if we require it
+    maxWeight.accumulateAndGet(weight, (current, update) -> current < update ? 
update : current);
+
+    // Update the cache to ensure that LRU is handled appropriately and for 
the cache to have an
+    // opportunity to shrink the maxWeight if necessary.
+    cache.put(Key.INSTANCE, this);
+
+    // Get the updated weight now that the cache may have been shrunk and 
respect it
+    long currentMax = maxWeight.get();
+
+    // Only compact and output from the bundle processing thread that is 
inserting elements into the
+    // grouping table. This ensures that we honor the guarantee that 
transforms for a single bundle
+    // execute using the same thread.
+    if (weight > currentMax) {
+      // Try to compact as many the values as possible and only flush values 
if compaction wasn't
+      // enough.
+      for (GroupingTableEntry valueToCompact : lruMap.values()) {
+        long currentWeight = valueToCompact.getWeight();
+        valueToCompact.compact();
+        weight += valueToCompact.getWeight() - currentWeight;
+      }
+
+      if (weight > currentMax) {
+        Iterator<GroupingTableEntry> iterator = lruMap.values().iterator();
+        while (iterator.hasNext()) {
+          GroupingTableEntry valueToFlush = iterator.next();
+          weight -= valueToFlush.getWeight() + 
valueToFlush.getGroupingKey().getWeight();

Review Comment:
   I'm having some trouble following all the different weights, and my first 
instinct is that since valueToFlush contains the GroupingKey, that this would 
count the weight of the grouping key twice (and presumably this would be bad 
because it wasn't counted twice when being originally added to the max weight).





Issue Time Tracking
-------------------

    Worklog Id:     (was: 770459)
    Time Spent: 81h  (was: 80h 50m)

> Optimize Java SDK harness
> -------------------------
>
>                 Key: BEAM-13015
>                 URL: https://issues.apache.org/jira/browse/BEAM-13015
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-harness
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: P2
>          Time Spent: 81h
>  Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to