[
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=770459&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-770459
]
ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 14/May/22 02:12
Start Date: 14/May/22 02:12
Worklog Time Spent: 10m
Work Description: youngoli commented on code in PR #17327:
URL: https://github.com/apache/beam/pull/17327#discussion_r872913431
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java:
##########
@@ -17,434 +17,392 @@
*/
package org.apache.beam.fn.harness;
-import java.util.HashMap;
+import java.util.Collection;
import java.util.Iterator;
-import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.Objects;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.fn.harness.Cache.Shrinkable;
import org.apache.beam.runners.core.GlobalCombineFnRunner;
import org.apache.beam.runners.core.GlobalCombineFnRunners;
import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
import org.joda.time.Instant;
-/** Static utility methods that provide {@link GroupingTable} implementations.
*/
+/**
+ * Static utility methods that provide a grouping table implementation.
+ *
+ * <p>{@link NotThreadSafe} because the caller must use the bundle processing
thread when invoking
+ * {@link #put} and {@link #flush}. {@link #shrink} may be called from any
thread.
+ */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
+@NotThreadSafe
public class PrecombineGroupingTable<K, InputT, AccumT>
- implements GroupingTable<K, InputT, AccumT> {
- private static long getGroupingTableSizeBytes(PipelineOptions options) {
- return options.as(SdkHarnessOptions.class).getGroupingTableMaxSizeMb() *
1024L * 1024L;
- }
+ implements Shrinkable<PrecombineGroupingTable<K, InputT, AccumT>>,
Weighted {
+
+ private static final Instant IGNORED = BoundedWindow.TIMESTAMP_MIN_VALUE;
- /** Returns a {@link GroupingTable} that combines inputs into a accumulator.
*/
- public static <K, InputT, AccumT> GroupingTable<WindowedValue<K>, InputT,
AccumT> combining(
+ /**
+ * Returns a grouping table that combines inputs into an accumulator. The
grouping table uses the
+ * cache to defer flushing output until the cache evicts the table.
+ */
+ public static <K, InputT, AccumT> PrecombineGroupingTable<K, InputT, AccumT>
combining(
PipelineOptions options,
+ Cache<Object, Object> cache,
CombineFn<InputT, AccumT, ?> combineFn,
- Coder<K> keyCoder,
- Coder<? super AccumT> accumulatorCoder) {
- Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
- new ValueCombiner<>(
- GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ Coder<K> keyCoder) {
return new PrecombineGroupingTable<>(
- getGroupingTableSizeBytes(options),
- new WindowingCoderGroupingKeyCreator<>(keyCoder),
- WindowedPairInfo.create(),
- valueCombiner,
- new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
- new CoderSizeEstimator<>(accumulatorCoder));
+ options,
+ cache,
+ keyCoder,
+ GlobalCombineFnRunners.create(combineFn),
+ Caches::weigh,
+ Caches::weigh);
}
/**
- * Returns a {@link GroupingTable} that combines inputs into a accumulator
with sampling {@link
- * SizeEstimator SizeEstimators}.
+ * Returns a grouping table that combines inputs into an accumulator with
sampling {@link
+ * SizeEstimator SizeEstimators}. The grouping table uses the cache to defer
flushing output until
+ * the cache evicts the table.
*/
- public static <K, InputT, AccumT>
- GroupingTable<WindowedValue<K>, InputT, AccumT> combiningAndSampling(
- PipelineOptions options,
- CombineFn<InputT, AccumT, ?> combineFn,
- Coder<K> keyCoder,
- Coder<? super AccumT> accumulatorCoder,
- double sizeEstimatorSampleRate) {
- Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
- new ValueCombiner<>(
- GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ public static <K, InputT, AccumT> PrecombineGroupingTable<K, InputT, AccumT>
combiningAndSampling(
+ PipelineOptions options,
+ Cache<Object, Object> cache,
+ CombineFn<InputT, AccumT, ?> combineFn,
+ Coder<K> keyCoder,
+ double sizeEstimatorSampleRate) {
return new PrecombineGroupingTable<>(
- getGroupingTableSizeBytes(options),
- new WindowingCoderGroupingKeyCreator<>(keyCoder),
- WindowedPairInfo.create(),
- valueCombiner,
- new SamplingSizeEstimator<>(
- new
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
- sizeEstimatorSampleRate,
- 1.0),
- new SamplingSizeEstimator<>(
- new CoderSizeEstimator<>(accumulatorCoder),
sizeEstimatorSampleRate, 1.0));
+ options,
+ cache,
+ keyCoder,
+ GlobalCombineFnRunners.create(combineFn),
+ new SamplingSizeEstimator<>(Caches::weigh, sizeEstimatorSampleRate,
1.0),
+ new SamplingSizeEstimator<>(Caches::weigh, sizeEstimatorSampleRate,
1.0));
}
- /** Provides client-specific operations for grouping keys. */
- public interface GroupingKeyCreator<K> {
- Object createGroupingKey(K key) throws Exception;
+ @Nullable
+ @Override
+ public PrecombineGroupingTable<K, InputT, AccumT> shrink() {
+ long currentWeight = maxWeight.updateAndGet(operand -> operand >> 1);
+ // It is possible that we are shrunk multiple times until the requested
max weight is too small.
+ // In this case we want to effectively stop shrinking since we can't
effectively cache much
+ // at this time and the next insertion will likely evict all records.
+ if (currentWeight <= 100L) {
+ return null;
+ }
+ return this;
}
- /** Implements Precombine GroupingKeyCreator via Coder. */
- public static class WindowingCoderGroupingKeyCreator<K>
- implements GroupingKeyCreator<WindowedValue<K>> {
+ @Override
+ public long getWeight() {
+ return maxWeight.get();
+ }
- private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ /** Provides client-specific operations for size estimates. */
+ @FunctionalInterface
+ public interface SizeEstimator<T> {
+ long estimateSize(T element);
+ }
- private final Coder<K> coder;
+ private final Coder<K> keyCoder;
+ private final GlobalCombineFnRunner<InputT, AccumT, ?> combineFn;
+ private final PipelineOptions options;
+ private final SizeEstimator<K> keySizer;
+ private final SizeEstimator<AccumT> accumulatorSizer;
+ private final Cache<Key, PrecombineGroupingTable<K, InputT, AccumT>> cache;
+ private final LinkedHashMap<GroupingTableKey, GroupingTableEntry> lruMap;
+ private final AtomicLong maxWeight;
+ private long weight;
- WindowingCoderGroupingKeyCreator(Coder<K> coder) {
- this.coder = coder;
- }
+ private static final class Key implements Weighted {
+ private static final Key INSTANCE = new Key();
@Override
- public Object createGroupingKey(WindowedValue<K> key) {
- // Ignore timestamp for grouping purposes.
- // The Precombine output will inherit the timestamp of one of its inputs.
- return WindowedValue.of(
- coder.structuralValue(key.getValue()), ignored, key.getWindows(),
key.getPane());
+ public long getWeight() {
+ // Ignore the actual size of this singleton because it is trivial and
because
+ // the weight reported here will be counted many times as it is present
in
+ // many different state subcaches.
+ return 0;
}
}
- /** Provides client-specific operations for size estimates. */
- public interface SizeEstimator<T> {
- long estimateSize(T element) throws Exception;
+ PrecombineGroupingTable(
+ PipelineOptions options,
+ Cache<?, ?> cache,
+ Coder<K> keyCoder,
+ GlobalCombineFnRunner<InputT, AccumT, ?> combineFn,
+ SizeEstimator<K> keySizer,
+ SizeEstimator<AccumT> accumulatorSizer) {
+ this.options = options;
+ this.cache = (Cache<Key, PrecombineGroupingTable<K, InputT, AccumT>>)
cache;
+ this.keyCoder = keyCoder;
+ this.combineFn = combineFn;
+ this.keySizer = keySizer;
+ this.accumulatorSizer = accumulatorSizer;
+ this.lruMap = new LinkedHashMap<>(16, 0.75f, true);
+ this.maxWeight = new AtomicLong();
+ this.weight = 0L;
+ this.cache.put(Key.INSTANCE, this);
}
- /** Implements SizeEstimator via Coder. */
- public static class CoderSizeEstimator<T> implements SizeEstimator<T> {
- /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
- private static class Observer extends ElementByteSizeObserver {
- private long observedSize = 0;
-
- @Override
- protected void reportElementSize(long elementSize) {
- observedSize += elementSize;
+ private static class GroupingTableKey implements Weighted {
+ private final Object structuralKey;
+ private final Collection<? extends BoundedWindow> windows;
+ private final PaneInfo paneInfo;
+ private final long weight;
+
+ <K> GroupingTableKey(
+ K key,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo paneInfo,
+ Coder<K> keyCoder,
+ SizeEstimator<K> keySizer) {
+ this.structuralKey = keyCoder.structuralValue(key);
+ this.windows = windows;
+ this.paneInfo = paneInfo;
+ // We account for the weight of the key using the keySizer if the
coder's structural value
+ // is the same as its value.
+ if (structuralKey == key) {
+ weight = keySizer.estimateSize(key) + Caches.weigh(windows) +
Caches.weigh(paneInfo);
+ } else {
+ weight = Caches.weigh(this);
}
}
- final Coder<T> coder;
-
- CoderSizeEstimator(Coder<T> coder) {
- this.coder = coder;
+ public Object getStructuralKey() {
+ return structuralKey;
}
- @Override
- public long estimateSize(T value) throws Exception {
- // First try using byte size observer
- CoderSizeEstimator.Observer observer = new CoderSizeEstimator.Observer();
- coder.registerByteSizeObserver(value, observer);
-
- if (!observer.getIsLazy()) {
- observer.advance();
- return observer.observedSize;
- } else {
- // Coder byte size observation is lazy (requires iteration for
observation) so fall back to
- // counting output stream
- CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
- coder.encode(value, os);
- return os.getCount();
- }
+ public Collection<? extends BoundedWindow> getWindows() {
+ return windows;
}
- }
- /**
- * Provides client-specific operations for working with elements that are
key/value or key/values
- * pairs.
- */
- public interface PairInfo {
- Object getKeyFromInputPair(Object pair);
-
- Object getValueFromInputPair(Object pair);
-
- Object makeOutputPair(Object key, Object value);
- }
-
- /** Implements Precombine PairInfo via KVs. */
- public static class WindowedPairInfo implements PairInfo {
- private static WindowedPairInfo theInstance = new WindowedPairInfo();
-
- public static WindowedPairInfo create() {
- return theInstance;
+ public PaneInfo getPaneInfo() {
+ return paneInfo;
}
- private WindowedPairInfo() {}
+ @Override
+ public long getWeight() {
+ return weight;
+ }
@Override
- public Object getKeyFromInputPair(Object pair) {
- @SuppressWarnings("unchecked")
- WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
- return windowedKv.withValue(windowedKv.getValue().getKey());
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GroupingTableKey)) {
+ return false;
+ }
+ GroupingTableKey that = (GroupingTableKey) o;
+ return Objects.equals(structuralKey, that.structuralKey)
+ && windows.equals(that.windows)
+ && paneInfo.equals(that.paneInfo);
}
@Override
- public Object getValueFromInputPair(Object pair) {
- @SuppressWarnings("unchecked")
- WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
- return windowedKv.getValue().getValue();
+ public int hashCode() {
+ return Objects.hash(structuralKey, windows, paneInfo);
}
@Override
- public Object makeOutputPair(Object key, Object values) {
- WindowedValue<?> windowedKey = (WindowedValue<?>) key;
- return windowedKey.withValue(KV.of(windowedKey.getValue(), values));
+ public String toString() {
+ return "GroupingTableKey{"
+ + "structuralKey="
+ + structuralKey
+ + ", windows="
+ + windows
+ + ", paneInfo="
+ + paneInfo
+ + ", weight="
+ + weight
+ + '}';
}
}
- /** Provides client-specific operations for combining values. */
- public interface Combiner<K, InputT, AccumT, OutputT> {
- AccumT createAccumulator(K key);
-
- AccumT add(K key, AccumT accumulator, InputT value);
-
- AccumT merge(K key, Iterable<AccumT> accumulators);
-
- AccumT compact(K key, AccumT accumulator);
+ private class GroupingTableEntry implements Weighted {
+ private final GroupingTableKey groupingKey;
+ private final K userKey;
+ private final long keySize;
+ private long accumulatorSize;
+ private AccumT accumulator;
+ private boolean dirty;
+
+ private GroupingTableEntry(GroupingTableKey groupingKey, K userKey, InputT
initialInputValue) {
+ this.groupingKey = groupingKey;
+ this.userKey = userKey;
+ if (groupingKey.getStructuralKey() == userKey) {
+ // This object is only storing references to the same objects that are
being stored
+ // by the cache so the accounting of the size of the key is occurring
already.
+ this.keySize = Caches.REFERENCE_SIZE * 2;
+ } else {
+ this.keySize = Caches.REFERENCE_SIZE + keySizer.estimateSize(userKey);
+ }
+ this.accumulator =
+ combineFn.createAccumulator(
+ options, NullSideInputReader.empty(), groupingKey.getWindows());
+ add(initialInputValue);
+ this.accumulatorSize = accumulatorSizer.estimateSize(accumulator);
+ }
- OutputT extract(K key, AccumT accumulator);
- }
+ public GroupingTableKey getGroupingKey() {
+ return groupingKey;
+ }
- /** Implements Precombine Combiner via Combine.KeyedCombineFn. */
- public static class ValueCombiner<K, InputT, AccumT, OutputT>
- implements Combiner<WindowedValue<K>, InputT, AccumT, OutputT> {
- private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn;
- private final SideInputReader sideInputReader;
- private final PipelineOptions options;
-
- private ValueCombiner(
- GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn,
- SideInputReader sideInputReader,
- PipelineOptions options) {
- this.combineFn = combineFn;
- this.sideInputReader = sideInputReader;
- this.options = options;
+ public K getKey() {
+ return userKey;
}
- @Override
- public AccumT createAccumulator(WindowedValue<K> windowedKey) {
- return this.combineFn.createAccumulator(options, sideInputReader,
windowedKey.getWindows());
+ public AccumT getValue() {
+ return accumulator;
}
@Override
- public AccumT add(WindowedValue<K> windowedKey, AccumT accumulator, InputT
value) {
- return this.combineFn.addInput(
- accumulator, value, options, sideInputReader,
windowedKey.getWindows());
+ public long getWeight() {
+ return keySize + accumulatorSize;
}
- @Override
- public AccumT merge(WindowedValue<K> windowedKey, Iterable<AccumT>
accumulators) {
- return this.combineFn.mergeAccumulators(
- accumulators, options, sideInputReader, windowedKey.getWindows());
+ public void compact() {
+ if (dirty) {
+ accumulator =
+ combineFn.compact(
+ accumulator, options, NullSideInputReader.empty(),
groupingKey.getWindows());
+ accumulatorSize = accumulatorSizer.estimateSize(accumulator);
+ dirty = false;
+ }
}
- @Override
- public AccumT compact(WindowedValue<K> windowedKey, AccumT accumulator) {
- return this.combineFn.compact(
- accumulator, options, sideInputReader, windowedKey.getWindows());
+ public void add(InputT value) {
+ dirty = true;
+ accumulator =
+ combineFn.addInput(
+ accumulator, value, options, NullSideInputReader.empty(),
groupingKey.getWindows());
+ accumulatorSize = accumulatorSizer.estimateSize(accumulator);
}
@Override
- public OutputT extract(WindowedValue<K> windowedKey, AccumT accumulator) {
- return this.combineFn.extractOutput(
- accumulator, options, sideInputReader, windowedKey.getWindows());
+ public String toString() {
+ return "GroupingTableEntry{"
+ + "groupingKey="
+ + groupingKey
+ + ", userKey="
+ + userKey
+ + ", keySize="
+ + keySize
+ + ", accumulatorSize="
+ + accumulatorSize
+ + ", accumulator="
+ + accumulator
+ + ", dirty="
+ + dirty
+ + '}';
}
}
- // How many bytes a word in the JVM has.
- private static final int BYTES_PER_JVM_WORD = getBytesPerJvmWord();
- /**
- * The number of bytes of overhead to store an entry in the grouping table
(a {@code
- * HashMap<StructuralByteArray, KeyAndValues>}), ignoring the actual number
of bytes in the keys
- * and values:
- *
- * <ul>
- * <li>an array element (1 word),
- * <li>a HashMap.Entry (4 words),
- * <li>a StructuralByteArray (1 words),
- * <li>a backing array (guessed at 1 word for the length),
- * <li>a KeyAndValues (2 words),
- * <li>an ArrayList (2 words),
- * <li>a backing array (1 word),
- * <li>per-object overhead (JVM-specific, guessed at 2 words * 6 objects).
- * </ul>
- */
- private static final int PER_KEY_OVERHEAD = 24 * BYTES_PER_JVM_WORD;
-
- /** A {@link GroupingTable} that uses the given combiner to combine values
in place. */
- // Keep the table relatively full to increase the chance of collisions.
- private static final double TARGET_LOAD = 0.9;
-
- private long maxSize;
- private final GroupingKeyCreator<? super K> groupingKeyCreator;
- private final PairInfo pairInfo;
- private final Combiner<? super K, InputT, AccumT, ?> combiner;
- private final SizeEstimator<? super K> keySizer;
- private final SizeEstimator<? super AccumT> accumulatorSizer;
-
- private long size = 0;
- private Map<Object, GroupingTableEntry<K, InputT, AccumT>> table;
-
- PrecombineGroupingTable(
- long maxSize,
- GroupingKeyCreator<? super K> groupingKeyCreator,
- PairInfo pairInfo,
- Combiner<? super K, InputT, AccumT, ?> combineFn,
- SizeEstimator<? super K> keySizer,
- SizeEstimator<? super AccumT> accumulatorSizer) {
- this.maxSize = maxSize;
- this.groupingKeyCreator = groupingKeyCreator;
- this.pairInfo = pairInfo;
- this.combiner = combineFn;
- this.keySizer = keySizer;
- this.accumulatorSizer = accumulatorSizer;
- this.table = new HashMap<>();
- }
-
- interface GroupingTableEntry<K, InputT, AccumT> {
- K getKey();
-
- AccumT getValue();
-
- void add(InputT value) throws Exception;
-
- long getSize();
-
- void compact() throws Exception;
- }
-
- private GroupingTableEntry<K, InputT, AccumT> createTableEntry(final K key)
throws Exception {
- return new GroupingTableEntry<K, InputT, AccumT>() {
- final long keySize = keySizer.estimateSize(key);
- AccumT accumulator = combiner.createAccumulator(key);
- long accumulatorSize = 0; // never used before a value is added...
-
- @Override
- public K getKey() {
- return key;
- }
-
- @Override
- public AccumT getValue() {
- return accumulator;
- }
-
- @Override
- public long getSize() {
- return keySize + accumulatorSize;
- }
-
- @Override
- public void compact() throws Exception {
- AccumT newAccumulator = combiner.compact(key, accumulator);
- if (newAccumulator != accumulator) {
- accumulator = newAccumulator;
- accumulatorSize = accumulatorSizer.estimateSize(newAccumulator);
- }
- }
-
- @Override
- public void add(InputT value) throws Exception {
- accumulator = combiner.add(key, accumulator, value);
- accumulatorSize = accumulatorSizer.estimateSize(accumulator);
- }
- };
- }
-
- /** Adds a pair to this table, possibly flushing some entries to output if
the table is full. */
- @SuppressWarnings("unchecked")
- @Override
- public void put(Object pair, Receiver receiver) throws Exception {
- put(
- (K) pairInfo.getKeyFromInputPair(pair),
- (InputT) pairInfo.getValueFromInputPair(pair),
- receiver);
- }
-
/**
* Adds the key and value to this table, possibly flushing some entries to
output if the table is
* full.
*/
- public void put(K key, InputT value, Receiver receiver) throws Exception {
- Object groupingKey = groupingKeyCreator.createGroupingKey(key);
- GroupingTableEntry<K, InputT, AccumT> entry = table.get(groupingKey);
- if (entry == null) {
- entry = createTableEntry(key);
- table.put(groupingKey, entry);
- size += PER_KEY_OVERHEAD;
- } else {
- size -= entry.getSize();
- }
- entry.add(value);
- size += entry.getSize();
-
- if (size >= maxSize) {
- long targetSize = (long) (TARGET_LOAD * maxSize);
- Iterator<GroupingTableEntry<K, InputT, AccumT>> entries =
table.values().iterator();
- while (size >= targetSize) {
- if (!entries.hasNext()) {
- // Should never happen, but sizes may be estimates...
- size = 0;
- break;
+ @VisibleForTesting
+ public void put(
+ WindowedValue<KV<K, InputT>> value, FnDataReceiver<WindowedValue<KV<K,
AccumT>>> receiver)
+ throws Exception {
+ // Ignore timestamp for grouping purposes.
+ // The Pre-combine output will inherit the timestamp of one of its inputs.
+ GroupingTableKey groupingKey =
+ new GroupingTableKey(
+ value.getValue().getKey(), value.getWindows(), value.getPane(),
keyCoder, keySizer);
+
+ lruMap.compute(
+ groupingKey,
+ (key, tableEntry) -> {
+ if (tableEntry == null) {
+ weight += groupingKey.getWeight();
+ tableEntry =
+ new GroupingTableEntry(
+ groupingKey, value.getValue().getKey(),
value.getValue().getValue());
+ } else {
+ weight -= tableEntry.getWeight();
+ tableEntry.add(value.getValue().getValue());
+ }
+ weight += tableEntry.getWeight();
+ return tableEntry;
+ });
+
+ // Increase the maximum only if we require it
+ maxWeight.accumulateAndGet(weight, (current, update) -> current < update ?
update : current);
+
+ // Update the cache to ensure that LRU is handled appropriately and for
the cache to have an
+ // opportunity to shrink the maxWeight if necessary.
+ cache.put(Key.INSTANCE, this);
+
+ // Get the updated weight now that the cache may have been shrunk and
respect it
+ long currentMax = maxWeight.get();
+
+ // Only compact and output from the bundle processing thread that is
inserting elements into the
+ // grouping table. This ensures that we honor the guarantee that
transforms for a single bundle
+ // execute using the same thread.
+ if (weight > currentMax) {
+ // Try to compact as many the values as possible and only flush values
if compaction wasn't
+ // enough.
+ for (GroupingTableEntry valueToCompact : lruMap.values()) {
+ long currentWeight = valueToCompact.getWeight();
+ valueToCompact.compact();
+ weight += valueToCompact.getWeight() - currentWeight;
+ }
+
+ if (weight > currentMax) {
+ Iterator<GroupingTableEntry> iterator = lruMap.values().iterator();
+ while (iterator.hasNext()) {
+ GroupingTableEntry valueToFlush = iterator.next();
+ weight -= valueToFlush.getWeight() +
valueToFlush.getGroupingKey().getWeight();
Review Comment:
I'm having some trouble following all the different weights, and my first
instinct is that since valueToFlush contains the GroupingKey, that this would
count the weight of the grouping key twice (and presumably this would be bad
because it wasn't counted twice when being originally added to the max weight).
Issue Time Tracking
-------------------
Worklog Id: (was: 770459)
Time Spent: 81h (was: 80h 50m)
> Optimize Java SDK harness
> -------------------------
>
> Key: BEAM-13015
> URL: https://issues.apache.org/jira/browse/BEAM-13015
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-harness
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: P2
> Time Spent: 81h
> Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)