lukecwik commented on code in PR #17327:
URL: https://github.com/apache/beam/pull/17327#discussion_r872872640
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java:
##########
@@ -46,79 +60,207 @@
@RunWith(JUnit4.class)
public class PrecombineGroupingTableTest {
- private static class TestOutputReceiver implements Receiver {
- final List<Object> outputElems = new ArrayList<>();
+ @Rule
+ public TestExecutorService executorService =
TestExecutors.from(Executors.newCachedThreadPool());
+
+ private static class TestOutputReceiver<T> implements FnDataReceiver<T> {
+ final List<T> outputElems = new ArrayList<>();
@Override
- public void process(Object elem) {
+ public void accept(T elem) {
outputElems.add(elem);
}
}
- @Test
- public void testCombiningGroupingTable() throws Exception {
- Combiner<Object, Integer, Long, Long> summingCombineFn =
- new Combiner<Object, Integer, Long, Long>() {
+ private static final CombineFn<Integer, Long, Long> COMBINE_FN =
+ new CombineFn<Integer, Long, Long>() {
- @Override
- public Long createAccumulator(Object key) {
- return 0L;
- }
+ @Override
+ public Long createAccumulator() {
+ return 0L;
+ }
- @Override
- public Long add(Object key, Long accumulator, Integer value) {
- return accumulator + value;
- }
+ @Override
+ public Long addInput(Long accumulator, Integer value) {
+ return accumulator + value;
+ }
- @Override
- public Long merge(Object key, Iterable<Long> accumulators) {
- long sum = 0;
- for (Long part : accumulators) {
- sum += part;
- }
- return sum;
+ @Override
+ public Long mergeAccumulators(Iterable<Long> accumulators) {
+ long sum = 0;
+ for (Long part : accumulators) {
+ sum += part;
}
+ return sum;
+ }
- @Override
- public Long compact(Object key, Long accumulator) {
- return accumulator;
+ @Override
+ public Long compact(Long accumulator) {
+ if (accumulator % 2 == 0) {
+ return accumulator / 4;
}
+ return accumulator;
+ }
- @Override
- public Long extract(Object key, Long accumulator) {
- return accumulator;
- }
- };
+ @Override
+ public Long extractOutput(Long accumulator) {
+ return accumulator;
+ }
+ };
+ @Test
+ public void testCombiningGroupingTableEvictsAllOnLargeEntry() throws
Exception {
PrecombineGroupingTable<String, Integer, Long> table =
new PrecombineGroupingTable<>(
- 100_000_000L,
- new IdentityGroupingKeyCreator(),
- new KvPairInfo(),
- summingCombineFn,
+ PipelineOptionsFactory.create(),
+ Caches.forMaximumBytes(2500L),
+ StringUtf8Coder.of(),
+ GlobalCombineFnRunners.create(COMBINE_FN),
new StringPowerSizeEstimator(),
new IdentitySizeEstimator());
- table.setMaxSize(1000);
- TestOutputReceiver receiver = new TestOutputReceiver();
+ TestOutputReceiver<WindowedValue<KV<String, Long>>> receiver = new
TestOutputReceiver<>();
- table.put("A", 1, receiver);
- table.put("B", 2, receiver);
- table.put("B", 3, receiver);
- table.put("C", 4, receiver);
+ table.put(valueInGlobalWindow(KV.of("A", 1)), receiver);
+ table.put(valueInGlobalWindow(KV.of("B", 3)), receiver);
+ table.put(valueInGlobalWindow(KV.of("B", 6)), receiver);
+ table.put(valueInGlobalWindow(KV.of("C", 7)), receiver);
assertThat(receiver.outputElems, empty());
- table.put("C", 5000, receiver);
- assertThat(receiver.outputElems, hasItem((Object) KV.of("C", 5004L)));
+ // Add beyond the size which causes compaction which still leads to
evicting all since the
+ // largest is most recent.
+ table.put(valueInGlobalWindow(KV.of("C", 9999)), receiver);
+ assertThat(
+ receiver.outputElems,
+ containsInAnyOrder(
+ valueInGlobalWindow(KV.of("A", 1L)),
+ valueInGlobalWindow(KV.of("B", 9L)),
+ valueInGlobalWindow(KV.of("C", (9999L + 7) / 4))));
+
+ table.flush(receiver);
+ assertThat(
+ receiver.outputElems,
+ containsInAnyOrder(
+ valueInGlobalWindow(KV.of("A", 1L)),
+ valueInGlobalWindow(KV.of("B", 3L + 6)),
+ valueInGlobalWindow(KV.of("C", (9999L + 7) / 4))));
+ }
+
+ @Test
+ public void testCombiningGroupingTableCompactionSaves() throws Exception {
+ PrecombineGroupingTable<String, Integer, Long> table =
+ new PrecombineGroupingTable<>(
+ PipelineOptionsFactory.create(),
+ Caches.forMaximumBytes(2500L),
+ StringUtf8Coder.of(),
+ GlobalCombineFnRunners.create(COMBINE_FN),
+ new StringPowerSizeEstimator(),
+ new IdentitySizeEstimator());
+
+ TestOutputReceiver<WindowedValue<KV<String, Long>>> receiver = new
TestOutputReceiver<>();
+
+ // Insert three compactable values which shouldn't lead to eviction even
though we are over
+ // the maximum size.
+ table.put(valueInGlobalWindow(KV.of("A", 1004)), receiver);
+ table.put(valueInGlobalWindow(KV.of("B", 1004)), receiver);
+ table.put(valueInGlobalWindow(KV.of("C", 1004)), receiver);
+ assertThat(receiver.outputElems, empty());
+
+ table.flush(receiver);
+ assertThat(
+ receiver.outputElems,
+ containsInAnyOrder(
+ valueInGlobalWindow(KV.of("A", 1004L / 4)),
+ valueInGlobalWindow(KV.of("B", 1004L / 4)),
+ valueInGlobalWindow(KV.of("C", 1004L / 4))));
+ }
+
+ @Test
+ public void testCombiningGroupingTablePartialEviction() throws Exception {
+ PrecombineGroupingTable<String, Integer, Long> table =
+ new PrecombineGroupingTable<>(
+ PipelineOptionsFactory.create(),
+ Caches.forMaximumBytes(2500L),
+ StringUtf8Coder.of(),
+ GlobalCombineFnRunners.create(COMBINE_FN),
+ new StringPowerSizeEstimator(),
+ new IdentitySizeEstimator());
+
+ TestOutputReceiver<WindowedValue<KV<String, Long>>> receiver = new
TestOutputReceiver<>();
- table.put("DDDD", 6, receiver);
- assertThat(receiver.outputElems, hasItem((Object) KV.of("DDDD", 6L)));
+ // Insert three values which even with compaction isn't enough so we evict
D & E to get
Review Comment:
Done
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java:
##########
@@ -17,434 +17,283 @@
*/
package org.apache.beam.fn.harness;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
+import java.util.LinkedHashMap;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.fn.harness.Cache.Shrinkable;
import org.apache.beam.runners.core.GlobalCombineFnRunner;
import org.apache.beam.runners.core.GlobalCombineFnRunners;
import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
import org.joda.time.Instant;
-/** Static utility methods that provide {@link GroupingTable} implementations.
*/
+/** Static utility methods that provide a grouping table implementation. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
+@NotThreadSafe
Review Comment:
Documented that `put` and `flush` must be called from the bundle processing
thread. `shrink` can be called from any thread.
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java:
##########
@@ -17,434 +17,283 @@
*/
package org.apache.beam.fn.harness;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
+import java.util.LinkedHashMap;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.fn.harness.Cache.Shrinkable;
import org.apache.beam.runners.core.GlobalCombineFnRunner;
import org.apache.beam.runners.core.GlobalCombineFnRunners;
import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
import org.joda.time.Instant;
-/** Static utility methods that provide {@link GroupingTable} implementations.
*/
+/** Static utility methods that provide a grouping table implementation. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
+@NotThreadSafe
public class PrecombineGroupingTable<K, InputT, AccumT>
- implements GroupingTable<K, InputT, AccumT> {
- private static long getGroupingTableSizeBytes(PipelineOptions options) {
- return options.as(SdkHarnessOptions.class).getGroupingTableMaxSizeMb() *
1024L * 1024L;
- }
+ implements Shrinkable<PrecombineGroupingTable<K, InputT, AccumT>>,
Weighted {
+
+ private static final Instant IGNORED = BoundedWindow.TIMESTAMP_MIN_VALUE;
- /** Returns a {@link GroupingTable} that combines inputs into a accumulator.
*/
- public static <K, InputT, AccumT> GroupingTable<WindowedValue<K>, InputT,
AccumT> combining(
+ /**
+ * Returns a grouping table that combines inputs into an accumulator. The
grouping table uses the
+ * cache to defer flushing output until the cache evicts the table.
+ */
+ public static <K, InputT, AccumT> PrecombineGroupingTable<K, InputT, AccumT>
combining(
PipelineOptions options,
+ Cache<Object, Object> cache,
CombineFn<InputT, AccumT, ?> combineFn,
- Coder<K> keyCoder,
- Coder<? super AccumT> accumulatorCoder) {
- Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
- new ValueCombiner<>(
- GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ Coder<K> keyCoder) {
return new PrecombineGroupingTable<>(
- getGroupingTableSizeBytes(options),
- new WindowingCoderGroupingKeyCreator<>(keyCoder),
- WindowedPairInfo.create(),
- valueCombiner,
- new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
- new CoderSizeEstimator<>(accumulatorCoder));
+ options,
+ cache,
+ keyCoder,
+ GlobalCombineFnRunners.create(combineFn),
+ Caches::weigh,
+ Caches::weigh);
}
/**
- * Returns a {@link GroupingTable} that combines inputs into a accumulator
with sampling {@link
- * SizeEstimator SizeEstimators}.
+ * Returns a grouping table that combines inputs into an accumulator with
sampling {@link
+ * SizeEstimator SizeEstimators}. The grouping table uses the cache to defer
flushing output until
+ * the cache evicts the table.
*/
- public static <K, InputT, AccumT>
- GroupingTable<WindowedValue<K>, InputT, AccumT> combiningAndSampling(
- PipelineOptions options,
- CombineFn<InputT, AccumT, ?> combineFn,
- Coder<K> keyCoder,
- Coder<? super AccumT> accumulatorCoder,
- double sizeEstimatorSampleRate) {
- Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
- new ValueCombiner<>(
- GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ public static <K, InputT, AccumT> PrecombineGroupingTable<K, InputT, AccumT>
combiningAndSampling(
+ PipelineOptions options,
+ Cache<Object, Object> cache,
+ CombineFn<InputT, AccumT, ?> combineFn,
+ Coder<K> keyCoder,
+ double sizeEstimatorSampleRate) {
return new PrecombineGroupingTable<>(
- getGroupingTableSizeBytes(options),
- new WindowingCoderGroupingKeyCreator<>(keyCoder),
- WindowedPairInfo.create(),
- valueCombiner,
- new SamplingSizeEstimator<>(
- new
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
- sizeEstimatorSampleRate,
- 1.0),
- new SamplingSizeEstimator<>(
- new CoderSizeEstimator<>(accumulatorCoder),
sizeEstimatorSampleRate, 1.0));
- }
-
- /** Provides client-specific operations for grouping keys. */
- public interface GroupingKeyCreator<K> {
- Object createGroupingKey(K key) throws Exception;
+ options,
+ cache,
+ keyCoder,
+ GlobalCombineFnRunners.create(combineFn),
+ new SamplingSizeEstimator<>(Caches::weigh, sizeEstimatorSampleRate,
1.0),
+ new SamplingSizeEstimator<>(Caches::weigh, sizeEstimatorSampleRate,
1.0));
}
- /** Implements Precombine GroupingKeyCreator via Coder. */
- public static class WindowingCoderGroupingKeyCreator<K>
- implements GroupingKeyCreator<WindowedValue<K>> {
-
- private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- private final Coder<K> coder;
-
- WindowingCoderGroupingKeyCreator(Coder<K> coder) {
- this.coder = coder;
+ @Nullable
+ @Override
+ public PrecombineGroupingTable<K, InputT, AccumT> shrink() {
+ long currentWeight = maxWeight.updateAndGet(operand -> operand >> 1);
+ // It is possible that we are shrunk multiple times until the requested
max weight is too small.
+ // In this case we want to effectively stop shrinking since we can't
effectively cache much
+ // at this time and the next insertion will likely evict all records.
+ if (currentWeight <= 100L) {
+ return null;
}
+ return this;
+ }
- @Override
- public Object createGroupingKey(WindowedValue<K> key) {
- // Ignore timestamp for grouping purposes.
- // The Precombine output will inherit the timestamp of one of its inputs.
- return WindowedValue.of(
- coder.structuralValue(key.getValue()), ignored, key.getWindows(),
key.getPane());
- }
+ @Override
+ public long getWeight() {
+ return maxWeight.get();
}
/** Provides client-specific operations for size estimates. */
+ @FunctionalInterface
public interface SizeEstimator<T> {
- long estimateSize(T element) throws Exception;
+ long estimateSize(T element);
}
- /** Implements SizeEstimator via Coder. */
- public static class CoderSizeEstimator<T> implements SizeEstimator<T> {
- /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
- private static class Observer extends ElementByteSizeObserver {
- private long observedSize = 0;
-
- @Override
- protected void reportElementSize(long elementSize) {
- observedSize += elementSize;
- }
- }
-
- final Coder<T> coder;
+ private final Coder<K> keyCoder;
+ private final GlobalCombineFnRunner<InputT, AccumT, ?> combineFn;
+ private final PipelineOptions options;
+ private final SizeEstimator<K> keySizer;
+ private final SizeEstimator<AccumT> accumulatorSizer;
+ private final Cache<Key, PrecombineGroupingTable<K, InputT, AccumT>> cache;
+ private final LinkedHashMap<WindowedValue<Object>, GroupingTableEntry>
lruMap;
+ private final AtomicLong maxWeight;
+ private long weight;
- CoderSizeEstimator(Coder<T> coder) {
- this.coder = coder;
- }
+ private static final class Key implements Weighted {
+ private static final Key INSTANCE = new Key();
@Override
- public long estimateSize(T value) throws Exception {
- // First try using byte size observer
- CoderSizeEstimator.Observer observer = new CoderSizeEstimator.Observer();
- coder.registerByteSizeObserver(value, observer);
-
- if (!observer.getIsLazy()) {
- observer.advance();
- return observer.observedSize;
- } else {
- // Coder byte size observation is lazy (requires iteration for
observation) so fall back to
- // counting output stream
- CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
- coder.encode(value, os);
- return os.getCount();
- }
+ public long getWeight() {
+ // Ignore the actual size of this singleton because it is trivial and
because
+ // the weight reported here will be counted many times as it is present
in
+ // many different state subcaches.
+ return 0;
}
}
- /**
- * Provides client-specific operations for working with elements that are
key/value or key/values
- * pairs.
- */
- public interface PairInfo {
- Object getKeyFromInputPair(Object pair);
-
- Object getValueFromInputPair(Object pair);
-
- Object makeOutputPair(Object key, Object value);
- }
-
- /** Implements Precombine PairInfo via KVs. */
- public static class WindowedPairInfo implements PairInfo {
- private static WindowedPairInfo theInstance = new WindowedPairInfo();
-
- public static WindowedPairInfo create() {
- return theInstance;
- }
-
- private WindowedPairInfo() {}
-
- @Override
- public Object getKeyFromInputPair(Object pair) {
- @SuppressWarnings("unchecked")
- WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
- return windowedKv.withValue(windowedKv.getValue().getKey());
- }
-
- @Override
- public Object getValueFromInputPair(Object pair) {
- @SuppressWarnings("unchecked")
- WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
- return windowedKv.getValue().getValue();
- }
-
- @Override
- public Object makeOutputPair(Object key, Object values) {
- WindowedValue<?> windowedKey = (WindowedValue<?>) key;
- return windowedKey.withValue(KV.of(windowedKey.getValue(), values));
- }
- }
-
- /** Provides client-specific operations for combining values. */
- public interface Combiner<K, InputT, AccumT, OutputT> {
- AccumT createAccumulator(K key);
-
- AccumT add(K key, AccumT accumulator, InputT value);
-
- AccumT merge(K key, Iterable<AccumT> accumulators);
-
- AccumT compact(K key, AccumT accumulator);
-
- OutputT extract(K key, AccumT accumulator);
+ PrecombineGroupingTable(
+ PipelineOptions options,
+ Cache<?, ?> cache,
+ Coder<K> keyCoder,
+ GlobalCombineFnRunner<InputT, AccumT, ?> combineFn,
+ SizeEstimator<K> keySizer,
+ SizeEstimator<AccumT> accumulatorSizer) {
+ this.options = options;
+ this.cache = (Cache<Key, PrecombineGroupingTable<K, InputT, AccumT>>)
cache;
+ this.keyCoder = keyCoder;
+ this.combineFn = combineFn;
+ this.keySizer = keySizer;
+ this.accumulatorSizer = accumulatorSizer;
+ this.lruMap = new LinkedHashMap<>(16, 0.75f, true);
+ this.maxWeight = new AtomicLong();
+ this.weight = 0L;
+ this.cache.put(Key.INSTANCE, this);
}
- /** Implements Precombine Combiner via Combine.KeyedCombineFn. */
- public static class ValueCombiner<K, InputT, AccumT, OutputT>
- implements Combiner<WindowedValue<K>, InputT, AccumT, OutputT> {
- private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn;
- private final SideInputReader sideInputReader;
- private final PipelineOptions options;
-
- private ValueCombiner(
- GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn,
- SideInputReader sideInputReader,
- PipelineOptions options) {
- this.combineFn = combineFn;
- this.sideInputReader = sideInputReader;
- this.options = options;
- }
-
- @Override
- public AccumT createAccumulator(WindowedValue<K> windowedKey) {
- return this.combineFn.createAccumulator(options, sideInputReader,
windowedKey.getWindows());
+ private class GroupingTableEntry implements Weighted {
+ private final WindowedValue<Object> groupingKey;
+ private final K userKey;
+ private final long keySize;
+ private long accumulatorSize;
+ private AccumT accumulator;
+ private boolean dirty;
+
+ private GroupingTableEntry(
+ WindowedValue<Object> groupingKey, K userKey, InputT
initialInputValue) {
+ this.groupingKey = groupingKey;
+ this.userKey = userKey;
+ if (groupingKey.getValue() == userKey) {
+ // This object is only storing references to the same objects that are
being stored
+ // by the cache so the accounting of the size of the key is occurring
already.
+ this.keySize = Caches.REFERENCE_SIZE * 2;
+ } else {
+ this.keySize = Caches.REFERENCE_SIZE + keySizer.estimateSize(userKey);
+ }
+ this.accumulator =
+ combineFn.createAccumulator(
+ options, NullSideInputReader.empty(), groupingKey.getWindows());
+ add(initialInputValue);
+ this.accumulatorSize = accumulatorSizer.estimateSize(accumulator);
}
- @Override
- public AccumT add(WindowedValue<K> windowedKey, AccumT accumulator, InputT
value) {
- return this.combineFn.addInput(
- accumulator, value, options, sideInputReader,
windowedKey.getWindows());
+ public WindowedValue<Object> getGroupingKey() {
+ return groupingKey;
}
- @Override
- public AccumT merge(WindowedValue<K> windowedKey, Iterable<AccumT>
accumulators) {
- return this.combineFn.mergeAccumulators(
- accumulators, options, sideInputReader, windowedKey.getWindows());
+ public K getKey() {
+ return userKey;
}
- @Override
- public AccumT compact(WindowedValue<K> windowedKey, AccumT accumulator) {
- return this.combineFn.compact(
- accumulator, options, sideInputReader, windowedKey.getWindows());
+ public AccumT getValue() {
+ return accumulator;
}
@Override
- public OutputT extract(WindowedValue<K> windowedKey, AccumT accumulator) {
- return this.combineFn.extractOutput(
- accumulator, options, sideInputReader, windowedKey.getWindows());
+ public long getWeight() {
+ return keySize + accumulatorSize;
}
- }
-
- // How many bytes a word in the JVM has.
- private static final int BYTES_PER_JVM_WORD = getBytesPerJvmWord();
- /**
- * The number of bytes of overhead to store an entry in the grouping table
(a {@code
- * HashMap<StructuralByteArray, KeyAndValues>}), ignoring the actual number
of bytes in the keys
- * and values:
- *
- * <ul>
- * <li>an array element (1 word),
- * <li>a HashMap.Entry (4 words),
- * <li>a StructuralByteArray (1 words),
- * <li>a backing array (guessed at 1 word for the length),
- * <li>a KeyAndValues (2 words),
- * <li>an ArrayList (2 words),
- * <li>a backing array (1 word),
- * <li>per-object overhead (JVM-specific, guessed at 2 words * 6 objects).
- * </ul>
- */
- private static final int PER_KEY_OVERHEAD = 24 * BYTES_PER_JVM_WORD;
-
- /** A {@link GroupingTable} that uses the given combiner to combine values
in place. */
- // Keep the table relatively full to increase the chance of collisions.
- private static final double TARGET_LOAD = 0.9;
-
- private long maxSize;
- private final GroupingKeyCreator<? super K> groupingKeyCreator;
- private final PairInfo pairInfo;
- private final Combiner<? super K, InputT, AccumT, ?> combiner;
- private final SizeEstimator<? super K> keySizer;
- private final SizeEstimator<? super AccumT> accumulatorSizer;
-
- private long size = 0;
- private Map<Object, GroupingTableEntry<K, InputT, AccumT>> table;
-
- PrecombineGroupingTable(
- long maxSize,
- GroupingKeyCreator<? super K> groupingKeyCreator,
- PairInfo pairInfo,
- Combiner<? super K, InputT, AccumT, ?> combineFn,
- SizeEstimator<? super K> keySizer,
- SizeEstimator<? super AccumT> accumulatorSizer) {
- this.maxSize = maxSize;
- this.groupingKeyCreator = groupingKeyCreator;
- this.pairInfo = pairInfo;
- this.combiner = combineFn;
- this.keySizer = keySizer;
- this.accumulatorSizer = accumulatorSizer;
- this.table = new HashMap<>();
- }
-
- interface GroupingTableEntry<K, InputT, AccumT> {
- K getKey();
-
- AccumT getValue();
-
- void add(InputT value) throws Exception;
-
- long getSize();
-
- void compact() throws Exception;
- }
-
- private GroupingTableEntry<K, InputT, AccumT> createTableEntry(final K key)
throws Exception {
- return new GroupingTableEntry<K, InputT, AccumT>() {
- final long keySize = keySizer.estimateSize(key);
- AccumT accumulator = combiner.createAccumulator(key);
- long accumulatorSize = 0; // never used before a value is added...
-
- @Override
- public K getKey() {
- return key;
- }
-
- @Override
- public AccumT getValue() {
- return accumulator;
- }
-
- @Override
- public long getSize() {
- return keySize + accumulatorSize;
- }
-
- @Override
- public void compact() throws Exception {
- AccumT newAccumulator = combiner.compact(key, accumulator);
- if (newAccumulator != accumulator) {
- accumulator = newAccumulator;
- accumulatorSize = accumulatorSizer.estimateSize(newAccumulator);
- }
- }
- @Override
- public void add(InputT value) throws Exception {
- accumulator = combiner.add(key, accumulator, value);
+ public void compact() {
+ if (dirty) {
+ accumulator =
+ combineFn.compact(
+ accumulator, options, NullSideInputReader.empty(),
groupingKey.getWindows());
accumulatorSize = accumulatorSizer.estimateSize(accumulator);
+ dirty = false;
}
- };
- }
+ }
- /** Adds a pair to this table, possibly flushing some entries to output if
the table is full. */
- @SuppressWarnings("unchecked")
- @Override
- public void put(Object pair, Receiver receiver) throws Exception {
- put(
- (K) pairInfo.getKeyFromInputPair(pair),
- (InputT) pairInfo.getValueFromInputPair(pair),
- receiver);
+ public void add(InputT value) {
+ dirty = true;
+ accumulator =
+ combineFn.addInput(
+ accumulator, value, options, NullSideInputReader.empty(),
groupingKey.getWindows());
+ accumulatorSize = accumulatorSizer.estimateSize(accumulator);
+ }
}
/**
* Adds the key and value to this table, possibly flushing some entries to
output if the table is
* full.
*/
- public void put(K key, InputT value, Receiver receiver) throws Exception {
- Object groupingKey = groupingKeyCreator.createGroupingKey(key);
- GroupingTableEntry<K, InputT, AccumT> entry = table.get(groupingKey);
- if (entry == null) {
- entry = createTableEntry(key);
- table.put(groupingKey, entry);
- size += PER_KEY_OVERHEAD;
- } else {
- size -= entry.getSize();
- }
- entry.add(value);
- size += entry.getSize();
-
- if (size >= maxSize) {
- long targetSize = (long) (TARGET_LOAD * maxSize);
- Iterator<GroupingTableEntry<K, InputT, AccumT>> entries =
table.values().iterator();
- while (size >= targetSize) {
- if (!entries.hasNext()) {
- // Should never happen, but sizes may be estimates...
- size = 0;
- break;
+ @VisibleForTesting
+ public void put(
+ WindowedValue<KV<K, InputT>> value, FnDataReceiver<WindowedValue<KV<K,
AccumT>>> receiver)
+ throws Exception {
+ // Ignore timestamp for grouping purposes.
+ // The Pre-combine output will inherit the timestamp of one of its inputs.
+ WindowedValue<Object> groupingKey =
+ WindowedValue.of(
+ keyCoder.structuralValue(value.getValue().getKey()),
+ IGNORED,
+ value.getWindows(),
+ value.getPane());
+
+ GroupingTableEntry entry =
+ lruMap.compute(
+ groupingKey,
+ (key, tableEntry) -> {
+ if (tableEntry == null) {
+ tableEntry =
+ new GroupingTableEntry(
+ groupingKey, value.getValue().getKey(),
value.getValue().getValue());
+ } else {
+ tableEntry.add(value.getValue().getValue());
+ }
+ return tableEntry;
+ });
+ weight += entry.getWeight();
+ // Increase the maximum only if we require it
+ maxWeight.accumulateAndGet(weight, (current, update) -> current < update ?
update : current);
+
+ // Update the cache to ensure that LRU is handled appropriately and for
the cache to have an
+ // opportunity to shrink the maxWeight if necessary.
+ cache.put(Key.INSTANCE, this);
+
+ // Get the updated weight now that the cache may have been shrunk and
respect it
+ long currentMax = maxWeight.get();
+ if (weight > currentMax) {
Review Comment:
Because we want to make sure that we only produce output from the bundle
processing thread and not from an arbitrary thread that caused the shrinking to
happen. Added a comment to reflect.
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java:
##########
@@ -17,434 +17,283 @@
*/
package org.apache.beam.fn.harness;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
+import java.util.LinkedHashMap;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.fn.harness.Cache.Shrinkable;
import org.apache.beam.runners.core.GlobalCombineFnRunner;
import org.apache.beam.runners.core.GlobalCombineFnRunners;
import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
import org.joda.time.Instant;
-/** Static utility methods that provide {@link GroupingTable} implementations.
*/
+/** Static utility methods that provide a grouping table implementation. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
+@NotThreadSafe
public class PrecombineGroupingTable<K, InputT, AccumT>
- implements GroupingTable<K, InputT, AccumT> {
- private static long getGroupingTableSizeBytes(PipelineOptions options) {
- return options.as(SdkHarnessOptions.class).getGroupingTableMaxSizeMb() *
1024L * 1024L;
- }
+ implements Shrinkable<PrecombineGroupingTable<K, InputT, AccumT>>,
Weighted {
+
+ private static final Instant IGNORED = BoundedWindow.TIMESTAMP_MIN_VALUE;
- /** Returns a {@link GroupingTable} that combines inputs into a accumulator.
*/
- public static <K, InputT, AccumT> GroupingTable<WindowedValue<K>, InputT,
AccumT> combining(
+ /**
+ * Returns a grouping table that combines inputs into an accumulator. The
grouping table uses the
+ * cache to defer flushing output until the cache evicts the table.
+ */
+ public static <K, InputT, AccumT> PrecombineGroupingTable<K, InputT, AccumT>
combining(
PipelineOptions options,
+ Cache<Object, Object> cache,
CombineFn<InputT, AccumT, ?> combineFn,
- Coder<K> keyCoder,
- Coder<? super AccumT> accumulatorCoder) {
- Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
- new ValueCombiner<>(
- GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ Coder<K> keyCoder) {
return new PrecombineGroupingTable<>(
- getGroupingTableSizeBytes(options),
- new WindowingCoderGroupingKeyCreator<>(keyCoder),
- WindowedPairInfo.create(),
- valueCombiner,
- new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
- new CoderSizeEstimator<>(accumulatorCoder));
+ options,
+ cache,
+ keyCoder,
+ GlobalCombineFnRunners.create(combineFn),
+ Caches::weigh,
+ Caches::weigh);
}
/**
- * Returns a {@link GroupingTable} that combines inputs into a accumulator
with sampling {@link
- * SizeEstimator SizeEstimators}.
+ * Returns a grouping table that combines inputs into an accumulator with
sampling {@link
+ * SizeEstimator SizeEstimators}. The grouping table uses the cache to defer
flushing output until
+ * the cache evicts the table.
*/
- public static <K, InputT, AccumT>
- GroupingTable<WindowedValue<K>, InputT, AccumT> combiningAndSampling(
- PipelineOptions options,
- CombineFn<InputT, AccumT, ?> combineFn,
- Coder<K> keyCoder,
- Coder<? super AccumT> accumulatorCoder,
- double sizeEstimatorSampleRate) {
- Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
- new ValueCombiner<>(
- GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ public static <K, InputT, AccumT> PrecombineGroupingTable<K, InputT, AccumT>
combiningAndSampling(
+ PipelineOptions options,
+ Cache<Object, Object> cache,
+ CombineFn<InputT, AccumT, ?> combineFn,
+ Coder<K> keyCoder,
+ double sizeEstimatorSampleRate) {
return new PrecombineGroupingTable<>(
- getGroupingTableSizeBytes(options),
- new WindowingCoderGroupingKeyCreator<>(keyCoder),
- WindowedPairInfo.create(),
- valueCombiner,
- new SamplingSizeEstimator<>(
- new
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
- sizeEstimatorSampleRate,
- 1.0),
- new SamplingSizeEstimator<>(
- new CoderSizeEstimator<>(accumulatorCoder),
sizeEstimatorSampleRate, 1.0));
- }
-
- /** Provides client-specific operations for grouping keys. */
- public interface GroupingKeyCreator<K> {
- Object createGroupingKey(K key) throws Exception;
+ options,
+ cache,
+ keyCoder,
+ GlobalCombineFnRunners.create(combineFn),
+ new SamplingSizeEstimator<>(Caches::weigh, sizeEstimatorSampleRate,
1.0),
+ new SamplingSizeEstimator<>(Caches::weigh, sizeEstimatorSampleRate,
1.0));
}
- /** Implements Precombine GroupingKeyCreator via Coder. */
- public static class WindowingCoderGroupingKeyCreator<K>
- implements GroupingKeyCreator<WindowedValue<K>> {
-
- private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- private final Coder<K> coder;
-
- WindowingCoderGroupingKeyCreator(Coder<K> coder) {
- this.coder = coder;
+ @Nullable
+ @Override
+ public PrecombineGroupingTable<K, InputT, AccumT> shrink() {
+ long currentWeight = maxWeight.updateAndGet(operand -> operand >> 1);
+ // It is possible that we are shrunk multiple times until the requested
max weight is too small.
+ // In this case we want to effectively stop shrinking since we can't
effectively cache much
+ // at this time and the next insertion will likely evict all records.
+ if (currentWeight <= 100L) {
+ return null;
}
+ return this;
+ }
- @Override
- public Object createGroupingKey(WindowedValue<K> key) {
- // Ignore timestamp for grouping purposes.
- // The Precombine output will inherit the timestamp of one of its inputs.
- return WindowedValue.of(
- coder.structuralValue(key.getValue()), ignored, key.getWindows(),
key.getPane());
- }
+ @Override
+ public long getWeight() {
+ return maxWeight.get();
}
/** Provides client-specific operations for size estimates. */
+ @FunctionalInterface
public interface SizeEstimator<T> {
- long estimateSize(T element) throws Exception;
+ long estimateSize(T element);
}
- /** Implements SizeEstimator via Coder. */
- public static class CoderSizeEstimator<T> implements SizeEstimator<T> {
- /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
- private static class Observer extends ElementByteSizeObserver {
- private long observedSize = 0;
-
- @Override
- protected void reportElementSize(long elementSize) {
- observedSize += elementSize;
- }
- }
-
- final Coder<T> coder;
+ private final Coder<K> keyCoder;
+ private final GlobalCombineFnRunner<InputT, AccumT, ?> combineFn;
+ private final PipelineOptions options;
+ private final SizeEstimator<K> keySizer;
+ private final SizeEstimator<AccumT> accumulatorSizer;
+ private final Cache<Key, PrecombineGroupingTable<K, InputT, AccumT>> cache;
+ private final LinkedHashMap<WindowedValue<Object>, GroupingTableEntry>
lruMap;
+ private final AtomicLong maxWeight;
+ private long weight;
- CoderSizeEstimator(Coder<T> coder) {
- this.coder = coder;
- }
+ private static final class Key implements Weighted {
+ private static final Key INSTANCE = new Key();
@Override
- public long estimateSize(T value) throws Exception {
- // First try using byte size observer
- CoderSizeEstimator.Observer observer = new CoderSizeEstimator.Observer();
- coder.registerByteSizeObserver(value, observer);
-
- if (!observer.getIsLazy()) {
- observer.advance();
- return observer.observedSize;
- } else {
- // Coder byte size observation is lazy (requires iteration for
observation) so fall back to
- // counting output stream
- CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
- coder.encode(value, os);
- return os.getCount();
- }
+ public long getWeight() {
+ // Ignore the actual size of this singleton because it is trivial and
because
+ // the weight reported here will be counted many times as it is present
in
+ // many different state subcaches.
+ return 0;
}
}
- /**
- * Provides client-specific operations for working with elements that are
key/value or key/values
- * pairs.
- */
- public interface PairInfo {
- Object getKeyFromInputPair(Object pair);
-
- Object getValueFromInputPair(Object pair);
-
- Object makeOutputPair(Object key, Object value);
- }
-
- /** Implements Precombine PairInfo via KVs. */
- public static class WindowedPairInfo implements PairInfo {
- private static WindowedPairInfo theInstance = new WindowedPairInfo();
-
- public static WindowedPairInfo create() {
- return theInstance;
- }
-
- private WindowedPairInfo() {}
-
- @Override
- public Object getKeyFromInputPair(Object pair) {
- @SuppressWarnings("unchecked")
- WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
- return windowedKv.withValue(windowedKv.getValue().getKey());
- }
-
- @Override
- public Object getValueFromInputPair(Object pair) {
- @SuppressWarnings("unchecked")
- WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
- return windowedKv.getValue().getValue();
- }
-
- @Override
- public Object makeOutputPair(Object key, Object values) {
- WindowedValue<?> windowedKey = (WindowedValue<?>) key;
- return windowedKey.withValue(KV.of(windowedKey.getValue(), values));
- }
- }
-
- /** Provides client-specific operations for combining values. */
- public interface Combiner<K, InputT, AccumT, OutputT> {
- AccumT createAccumulator(K key);
-
- AccumT add(K key, AccumT accumulator, InputT value);
-
- AccumT merge(K key, Iterable<AccumT> accumulators);
-
- AccumT compact(K key, AccumT accumulator);
-
- OutputT extract(K key, AccumT accumulator);
+ PrecombineGroupingTable(
+ PipelineOptions options,
+ Cache<?, ?> cache,
+ Coder<K> keyCoder,
+ GlobalCombineFnRunner<InputT, AccumT, ?> combineFn,
+ SizeEstimator<K> keySizer,
+ SizeEstimator<AccumT> accumulatorSizer) {
+ this.options = options;
+ this.cache = (Cache<Key, PrecombineGroupingTable<K, InputT, AccumT>>)
cache;
+ this.keyCoder = keyCoder;
+ this.combineFn = combineFn;
+ this.keySizer = keySizer;
+ this.accumulatorSizer = accumulatorSizer;
+ this.lruMap = new LinkedHashMap<>(16, 0.75f, true);
+ this.maxWeight = new AtomicLong();
+ this.weight = 0L;
+ this.cache.put(Key.INSTANCE, this);
}
- /** Implements Precombine Combiner via Combine.KeyedCombineFn. */
- public static class ValueCombiner<K, InputT, AccumT, OutputT>
- implements Combiner<WindowedValue<K>, InputT, AccumT, OutputT> {
- private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn;
- private final SideInputReader sideInputReader;
- private final PipelineOptions options;
-
- private ValueCombiner(
- GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn,
- SideInputReader sideInputReader,
- PipelineOptions options) {
- this.combineFn = combineFn;
- this.sideInputReader = sideInputReader;
- this.options = options;
- }
-
- @Override
- public AccumT createAccumulator(WindowedValue<K> windowedKey) {
- return this.combineFn.createAccumulator(options, sideInputReader,
windowedKey.getWindows());
+ private class GroupingTableEntry implements Weighted {
+ private final WindowedValue<Object> groupingKey;
+ private final K userKey;
+ private final long keySize;
+ private long accumulatorSize;
+ private AccumT accumulator;
+ private boolean dirty;
+
+ private GroupingTableEntry(
+ WindowedValue<Object> groupingKey, K userKey, InputT
initialInputValue) {
+ this.groupingKey = groupingKey;
+ this.userKey = userKey;
+ if (groupingKey.getValue() == userKey) {
+ // This object is only storing references to the same objects that are
being stored
+ // by the cache so the accounting of the size of the key is occurring
already.
+ this.keySize = Caches.REFERENCE_SIZE * 2;
+ } else {
+ this.keySize = Caches.REFERENCE_SIZE + keySizer.estimateSize(userKey);
+ }
+ this.accumulator =
+ combineFn.createAccumulator(
+ options, NullSideInputReader.empty(), groupingKey.getWindows());
+ add(initialInputValue);
+ this.accumulatorSize = accumulatorSizer.estimateSize(accumulator);
}
- @Override
- public AccumT add(WindowedValue<K> windowedKey, AccumT accumulator, InputT
value) {
- return this.combineFn.addInput(
- accumulator, value, options, sideInputReader,
windowedKey.getWindows());
+ public WindowedValue<Object> getGroupingKey() {
+ return groupingKey;
}
- @Override
- public AccumT merge(WindowedValue<K> windowedKey, Iterable<AccumT>
accumulators) {
- return this.combineFn.mergeAccumulators(
- accumulators, options, sideInputReader, windowedKey.getWindows());
+ public K getKey() {
+ return userKey;
}
- @Override
- public AccumT compact(WindowedValue<K> windowedKey, AccumT accumulator) {
- return this.combineFn.compact(
- accumulator, options, sideInputReader, windowedKey.getWindows());
+ public AccumT getValue() {
+ return accumulator;
}
@Override
- public OutputT extract(WindowedValue<K> windowedKey, AccumT accumulator) {
- return this.combineFn.extractOutput(
- accumulator, options, sideInputReader, windowedKey.getWindows());
+ public long getWeight() {
+ return keySize + accumulatorSize;
}
- }
-
- // How many bytes a word in the JVM has.
- private static final int BYTES_PER_JVM_WORD = getBytesPerJvmWord();
- /**
- * The number of bytes of overhead to store an entry in the grouping table
(a {@code
- * HashMap<StructuralByteArray, KeyAndValues>}), ignoring the actual number
of bytes in the keys
- * and values:
- *
- * <ul>
- * <li>an array element (1 word),
- * <li>a HashMap.Entry (4 words),
- * <li>a StructuralByteArray (1 words),
- * <li>a backing array (guessed at 1 word for the length),
- * <li>a KeyAndValues (2 words),
- * <li>an ArrayList (2 words),
- * <li>a backing array (1 word),
- * <li>per-object overhead (JVM-specific, guessed at 2 words * 6 objects).
- * </ul>
- */
- private static final int PER_KEY_OVERHEAD = 24 * BYTES_PER_JVM_WORD;
-
- /** A {@link GroupingTable} that uses the given combiner to combine values
in place. */
- // Keep the table relatively full to increase the chance of collisions.
- private static final double TARGET_LOAD = 0.9;
-
- private long maxSize;
- private final GroupingKeyCreator<? super K> groupingKeyCreator;
- private final PairInfo pairInfo;
- private final Combiner<? super K, InputT, AccumT, ?> combiner;
- private final SizeEstimator<? super K> keySizer;
- private final SizeEstimator<? super AccumT> accumulatorSizer;
-
- private long size = 0;
- private Map<Object, GroupingTableEntry<K, InputT, AccumT>> table;
-
- PrecombineGroupingTable(
- long maxSize,
- GroupingKeyCreator<? super K> groupingKeyCreator,
- PairInfo pairInfo,
- Combiner<? super K, InputT, AccumT, ?> combineFn,
- SizeEstimator<? super K> keySizer,
- SizeEstimator<? super AccumT> accumulatorSizer) {
- this.maxSize = maxSize;
- this.groupingKeyCreator = groupingKeyCreator;
- this.pairInfo = pairInfo;
- this.combiner = combineFn;
- this.keySizer = keySizer;
- this.accumulatorSizer = accumulatorSizer;
- this.table = new HashMap<>();
- }
-
- interface GroupingTableEntry<K, InputT, AccumT> {
- K getKey();
-
- AccumT getValue();
-
- void add(InputT value) throws Exception;
-
- long getSize();
-
- void compact() throws Exception;
- }
-
- private GroupingTableEntry<K, InputT, AccumT> createTableEntry(final K key)
throws Exception {
- return new GroupingTableEntry<K, InputT, AccumT>() {
- final long keySize = keySizer.estimateSize(key);
- AccumT accumulator = combiner.createAccumulator(key);
- long accumulatorSize = 0; // never used before a value is added...
-
- @Override
- public K getKey() {
- return key;
- }
-
- @Override
- public AccumT getValue() {
- return accumulator;
- }
-
- @Override
- public long getSize() {
- return keySize + accumulatorSize;
- }
-
- @Override
- public void compact() throws Exception {
- AccumT newAccumulator = combiner.compact(key, accumulator);
- if (newAccumulator != accumulator) {
- accumulator = newAccumulator;
- accumulatorSize = accumulatorSizer.estimateSize(newAccumulator);
- }
- }
- @Override
- public void add(InputT value) throws Exception {
- accumulator = combiner.add(key, accumulator, value);
+ public void compact() {
+ if (dirty) {
+ accumulator =
+ combineFn.compact(
+ accumulator, options, NullSideInputReader.empty(),
groupingKey.getWindows());
accumulatorSize = accumulatorSizer.estimateSize(accumulator);
+ dirty = false;
}
- };
- }
+ }
- /** Adds a pair to this table, possibly flushing some entries to output if
the table is full. */
- @SuppressWarnings("unchecked")
- @Override
- public void put(Object pair, Receiver receiver) throws Exception {
- put(
- (K) pairInfo.getKeyFromInputPair(pair),
- (InputT) pairInfo.getValueFromInputPair(pair),
- receiver);
+ public void add(InputT value) {
+ dirty = true;
+ accumulator =
+ combineFn.addInput(
+ accumulator, value, options, NullSideInputReader.empty(),
groupingKey.getWindows());
+ accumulatorSize = accumulatorSizer.estimateSize(accumulator);
+ }
}
/**
* Adds the key and value to this table, possibly flushing some entries to
output if the table is
* full.
*/
- public void put(K key, InputT value, Receiver receiver) throws Exception {
- Object groupingKey = groupingKeyCreator.createGroupingKey(key);
- GroupingTableEntry<K, InputT, AccumT> entry = table.get(groupingKey);
- if (entry == null) {
- entry = createTableEntry(key);
- table.put(groupingKey, entry);
- size += PER_KEY_OVERHEAD;
- } else {
- size -= entry.getSize();
- }
- entry.add(value);
- size += entry.getSize();
-
- if (size >= maxSize) {
- long targetSize = (long) (TARGET_LOAD * maxSize);
- Iterator<GroupingTableEntry<K, InputT, AccumT>> entries =
table.values().iterator();
- while (size >= targetSize) {
- if (!entries.hasNext()) {
- // Should never happen, but sizes may be estimates...
- size = 0;
- break;
+ @VisibleForTesting
+ public void put(
+ WindowedValue<KV<K, InputT>> value, FnDataReceiver<WindowedValue<KV<K,
AccumT>>> receiver)
+ throws Exception {
+ // Ignore timestamp for grouping purposes.
+ // The Pre-combine output will inherit the timestamp of one of its inputs.
+ WindowedValue<Object> groupingKey =
+ WindowedValue.of(
+ keyCoder.structuralValue(value.getValue().getKey()),
+ IGNORED,
+ value.getWindows(),
+ value.getPane());
+
+ GroupingTableEntry entry =
+ lruMap.compute(
+ groupingKey,
+ (key, tableEntry) -> {
+ if (tableEntry == null) {
+ tableEntry =
+ new GroupingTableEntry(
+ groupingKey, value.getValue().getKey(),
value.getValue().getValue());
+ } else {
+ tableEntry.add(value.getValue().getValue());
+ }
+ return tableEntry;
+ });
+ weight += entry.getWeight();
Review Comment:
Fixed and updated tests since it turned out we weren't accounting for the
grouping table key.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]