[
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118319&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118319
]
ASF GitHub Bot logged work on BEAM-3708:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m
Work Description: lukecwik commented on a change in pull request #5795:
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199576986
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
##########
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/** Static utility methods that provide {@link GroupingTable} implementations.
*/
+public class PrecombineGroupingTable<K, InputT, AccumT>
+ implements GroupingTable<K, InputT, AccumT> {
+ /** Returns a {@link GroupingTable} that combines inputs into a accumulator.
*/
+ public static <K, InputT, AccumT> GroupingTable<WindowedValue<K>, InputT,
AccumT> combining(
+ PipelineOptions options,
+ CombineFn<InputT, AccumT, ?> combineFn,
+ Coder<K> keyCoder,
+ Coder<? super AccumT> accumulatorCoder) {
+ Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
+ new ValueCombiner<>(
+ GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ return new PrecombineGroupingTable<>(
+ DEFAULT_MAX_GROUPING_TABLE_BYTES,
+ new WindowingCoderGroupingKeyCreator<>(keyCoder),
+ WindowedPairInfo.create(),
+ valueCombiner,
+ new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+ new CoderSizeEstimator<>(accumulatorCoder));
+ }
+
+ /**
+ * Returns a {@link GroupingTable} that combines inputs into a accumulator
with sampling {@link
+ * SizeEstimator SizeEstimators}.
+ */
+ public static <K, InputT, AccumT>
+ GroupingTable<WindowedValue<K>, InputT, AccumT> combiningAndSampling(
+ PipelineOptions options,
+ CombineFn<InputT, AccumT, ?> combineFn,
+ Coder<K> keyCoder,
+ Coder<? super AccumT> accumulatorCoder,
+ double sizeEstimatorSampleRate) {
+ Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
+ new ValueCombiner<>(
+ GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ return new PrecombineGroupingTable<>(
+ DEFAULT_MAX_GROUPING_TABLE_BYTES,
+ new WindowingCoderGroupingKeyCreator<>(keyCoder),
+ WindowedPairInfo.create(),
+ valueCombiner,
+ new SamplingSizeEstimator<>(
+ new
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+ sizeEstimatorSampleRate,
+ 1.0),
+ new SamplingSizeEstimator<>(
+ new CoderSizeEstimator<>(accumulatorCoder),
sizeEstimatorSampleRate, 1.0));
+ }
+
+ /** Provides client-specific operations for grouping keys. */
+ public interface GroupingKeyCreator<K> {
+ Object createGroupingKey(K key) throws Exception;
+ }
+
+ /** Implements Precombine GroupingKeyCreator via Coder. */
+ public static class WindowingCoderGroupingKeyCreator<K>
+ implements GroupingKeyCreator<WindowedValue<K>> {
+
+ private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ private final Coder<K> coder;
+
+ WindowingCoderGroupingKeyCreator(Coder<K> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public Object createGroupingKey(WindowedValue<K> key) {
+ // Ignore timestamp for grouping purposes.
+ // The Precombine output will inherit the timestamp of one of its inputs.
+ return WindowedValue.of(
+ coder.structuralValue(key.getValue()), ignored, key.getWindows(),
key.getPane());
+ }
+ }
+
+ /** Provides client-specific operations for size estimates. */
+ public interface SizeEstimator<T> {
+ long estimateSize(T element) throws Exception;
+ }
+
+ /** Implements SizeEstimator via Coder. */
+ public static class CoderSizeEstimator<T> implements SizeEstimator<T> {
+ /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
+ private static class Observer extends ElementByteSizeObserver {
+ private long observedSize = 0;
+
+ @Override
+ protected void reportElementSize(long elementSize) {
+ observedSize += elementSize;
+ }
+ }
+
+ final Coder<T> coder;
+
+ CoderSizeEstimator(Coder<T> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public long estimateSize(T value) throws Exception {
+ // First try using byte size observer
+ CoderSizeEstimator.Observer observer = new CoderSizeEstimator.Observer();
+ coder.registerByteSizeObserver(value, observer);
+
+ if (!observer.getIsLazy()) {
+ observer.advance();
+ return observer.observedSize;
+ } else {
+ // Coder byte size observation is lazy (requires iteration for
observation) so fall back to
+ // counting output stream
+ CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
+ coder.encode(value, os);
+ return os.getCount();
+ }
+ }
+ }
+
+ /**
+ * Provides client-specific operations for working with elements that are
key/value or key/values
+ * pairs.
+ */
+ public interface PairInfo {
+ Object getKeyFromInputPair(Object pair);
+
+ Object getValueFromInputPair(Object pair);
+
+ Object makeOutputPair(Object key, Object value);
+ }
+
+ /** Implements Precombine PairInfo via KVs. */
+ public static class WindowedPairInfo implements PairInfo {
+ private static WindowedPairInfo theInstance = new WindowedPairInfo();
+
+ public static WindowedPairInfo create() {
+ return theInstance;
+ }
+
+ private WindowedPairInfo() {}
+
+ @Override
+ public Object getKeyFromInputPair(Object pair) {
+ @SuppressWarnings("unchecked")
+ WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
+ return windowedKv.withValue(windowedKv.getValue().getKey());
+ }
+
+ @Override
+ public Object getValueFromInputPair(Object pair) {
+ @SuppressWarnings("unchecked")
+ WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
+ return windowedKv.getValue().getValue();
+ }
+
+ @Override
+ public Object makeOutputPair(Object key, Object values) {
+ WindowedValue<?> windowedKey = (WindowedValue<?>) key;
+ return windowedKey.withValue(KV.of(windowedKey.getValue(), values));
+ }
+ }
+
+ /** Provides client-specific operations for combining values. */
+ public interface Combiner<K, InputT, AccumT, OutputT> {
+ AccumT createAccumulator(K key);
+
+ AccumT add(K key, AccumT accumulator, InputT value);
+
+ AccumT merge(K key, Iterable<AccumT> accumulators);
+
+ AccumT compact(K key, AccumT accumulator);
+
+ OutputT extract(K key, AccumT accumulator);
+ }
+
+ /** Implements Precombine Combiner via Combine.KeyedCombineFn. */
+ public static class ValueCombiner<K, InputT, AccumT, OutputT>
+ implements Combiner<WindowedValue<K>, InputT, AccumT, OutputT> {
+ private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn;
+ private final SideInputReader sideInputReader;
+ private final PipelineOptions options;
+
+ private ValueCombiner(
+ GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn,
+ SideInputReader sideInputReader,
+ PipelineOptions options) {
+ this.combineFn = combineFn;
+ this.sideInputReader = sideInputReader;
+ this.options = options;
+ }
+
+ @Override
+ public AccumT createAccumulator(WindowedValue<K> windowedKey) {
+ return this.combineFn.createAccumulator(options, sideInputReader,
windowedKey.getWindows());
+ }
+
+ @Override
+ public AccumT add(WindowedValue<K> windowedKey, AccumT accumulator, InputT
value) {
+ return this.combineFn.addInput(
+ accumulator, value, options, sideInputReader,
windowedKey.getWindows());
+ }
+
+ @Override
+ public AccumT merge(WindowedValue<K> windowedKey, Iterable<AccumT>
accumulators) {
+ return this.combineFn.mergeAccumulators(
+ accumulators, options, sideInputReader, windowedKey.getWindows());
+ }
+
+ @Override
+ public AccumT compact(WindowedValue<K> windowedKey, AccumT accumulator) {
+ return this.combineFn.compact(
+ accumulator, options, sideInputReader, windowedKey.getWindows());
+ }
+
+ @Override
+ public OutputT extract(WindowedValue<K> windowedKey, AccumT accumulator) {
+ return this.combineFn.extractOutput(
+ accumulator, options, sideInputReader, windowedKey.getWindows());
+ }
+ }
+
+ // By default, how many bytes we allow the grouping table to consume before
+ // it has to be flushed.
+ private static final long DEFAULT_MAX_GROUPING_TABLE_BYTES = 100_000_000L;
+
+ // How many bytes a word in the JVM has.
+ private static final int BYTES_PER_JVM_WORD = getBytesPerJvmWord();
+ /**
+ * The number of bytes of overhead to store an entry in the grouping table
(a {@code
+ * HashMap<StructuralByteArray, KeyAndValues>}), ignoring the actual number
of bytes in the keys
+ * and values:
+ *
+ * <ul>
+ * <li>an array element (1 word),
+ * <li>a HashMap.Entry (4 words),
+ * <li>a StructuralByteArray (1 words),
+ * <li>a backing array (guessed at 1 word for the length),
+ * <li>a KeyAndValues (2 words),
+ * <li>an ArrayList (2 words),
+ * <li>a backing array (1 word),
+ * <li>per-object overhead (JVM-specific, guessed at 2 words * 6 objects).
+ * </ul>
+ */
+ private static final int PER_KEY_OVERHEAD = 24 * BYTES_PER_JVM_WORD;
+
+ /** A {@link GroupingTable} that uses the given combiner to combine values
in place. */
+ // Keep the table relatively full to increase the chance of collisions.
+ private static final double TARGET_LOAD = 0.9;
+
+ private long maxSize;
+ private final GroupingKeyCreator<? super K> groupingKeyCreator;
+ private final PairInfo pairInfo;
+ private final Combiner<? super K, InputT, AccumT, ?> combiner;
+ private final SizeEstimator<? super K> keySizer;
+ private final SizeEstimator<? super AccumT> accumulatorSizer;
+
+ private long size = 0;
+ private Map<Object, GroupingTableEntry<K, InputT, AccumT>> table;
+
+ PrecombineGroupingTable(
+ long maxSize,
+ GroupingKeyCreator<? super K> groupingKeyCreator,
+ PairInfo pairInfo,
+ Combiner<? super K, InputT, AccumT, ?> combineFn,
+ SizeEstimator<? super K> keySizer,
+ SizeEstimator<? super AccumT> accumulatorSizer) {
+ this.maxSize = maxSize;
+ this.groupingKeyCreator = groupingKeyCreator;
+ this.pairInfo = pairInfo;
+ this.combiner = combineFn;
+ this.keySizer = keySizer;
+ this.accumulatorSizer = accumulatorSizer;
+ this.table = new HashMap<>();
+ }
+
+ interface GroupingTableEntry<K, InputT, AccumT> {
+ K getKey();
+
+ AccumT getValue();
+
+ void add(InputT value) throws Exception;
+
+ long getSize();
+
+ void compact() throws Exception;
+ }
+
+ private GroupingTableEntry<K, InputT, AccumT> createTableEntry(final K key)
throws Exception {
+ return new GroupingTableEntry<K, InputT, AccumT>() {
+ final long keySize = keySizer.estimateSize(key);
+ AccumT accumulator = combiner.createAccumulator(key);
+ long accumulatorSize = 0; // never used before a value is added...
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public AccumT getValue() {
+ return accumulator;
+ }
+
+ @Override
+ public long getSize() {
+ return keySize + accumulatorSize;
+ }
+
+ @Override
+ public void compact() throws Exception {
+ AccumT newAccumulator = combiner.compact(key, accumulator);
+ if (newAccumulator != accumulator) {
+ accumulator = newAccumulator;
+ accumulatorSize = accumulatorSizer.estimateSize(newAccumulator);
+ }
+ }
+
+ @Override
+ public void add(InputT value) throws Exception {
+ accumulator = combiner.add(key, accumulator, value);
+ accumulatorSize = accumulatorSizer.estimateSize(accumulator);
+ }
+ };
+ }
+
+ /** Adds a pair to this table, possibly flushing some entries to output if
the table is full. */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void put(Object pair, Receiver receiver) throws Exception {
+ put(
+ (K) pairInfo.getKeyFromInputPair(pair),
+ (InputT) pairInfo.getValueFromInputPair(pair),
+ receiver);
+ }
+
+ /**
+ * Adds the key and value to this table, possibly flushing some entries to
output if the table is
+ * full.
+ */
+ public void put(K key, InputT value, Receiver receiver) throws Exception {
+ Object groupingKey = groupingKeyCreator.createGroupingKey(key);
+ GroupingTableEntry<K, InputT, AccumT> entry = table.get(groupingKey);
+ if (entry == null) {
+ entry = createTableEntry(key);
+ table.put(groupingKey, entry);
+ size += PER_KEY_OVERHEAD;
+ } else {
+ size -= entry.getSize();
+ }
+ entry.add(value);
+ size += entry.getSize();
+
+ if (size >= maxSize) {
+ long targetSize = (long) (TARGET_LOAD * maxSize);
+ Iterator<GroupingTableEntry<K, InputT, AccumT>> entries =
table.values().iterator();
+ while (size >= targetSize) {
+ if (!entries.hasNext()) {
+ // Should never happen, but sizes may be estimates...
+ size = 0;
+ break;
+ }
+ GroupingTableEntry<K, InputT, AccumT> toFlush = entries.next();
+ entries.remove();
+ size -= toFlush.getSize() + PER_KEY_OVERHEAD;
+ output(toFlush, receiver);
+ }
+ }
+ }
+
+ /**
+ * Output the given entry. Does not actually remove it from the table or
update this table's size.
+ */
+ private void output(GroupingTableEntry<K, InputT, AccumT> entry, Receiver
receiver)
+ throws Exception {
+ entry.compact();
+ receiver.process(pairInfo.makeOutputPair(entry.getKey(),
entry.getValue()));
+ }
+
+ /** Flushes all entries in this table to output. */
+ @Override
+ public void flush(Receiver output) throws Exception {
+ for (GroupingTableEntry<K, InputT, AccumT> entry : table.values()) {
+ output(entry, output);
+ }
+ table.clear();
+ size = 0;
+ }
+
+ @VisibleForTesting
+ public void setMaxSize(long maxSize) {
Review comment:
It would be much better if this was a final value and configurable via
construction only.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 118319)
> Implement the portable lifted Combiner transforms in Java SDK
> -------------------------------------------------------------
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core, sdk-java-harness
> Reporter: Daniel Oliveira
> Assignee: Daniel Oliveira
> Priority: Major
> Labels: portability
> Time Spent: 5h 20m
> Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These
> parts need to be implemented in the Java SDK harness so that the SDK can
> actually execute them when receiving Combine transforms with the
> corresponding URNs.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)