[
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118339&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118339
]
ASF GitHub Bot logged work on BEAM-3708:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Jul/18 18:33
Start Date: 02/Jul/18 18:33
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #5795: [BEAM-3708] Adding
grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
index 35399b92e4c..178c4c1c0d8 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
@@ -18,20 +18,40 @@
package org.apache.beam.fn.harness;
import com.google.auto.service.AutoService;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
import java.io.IOException;
+import java.util.Collection;
import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.control.BundleSplitListener;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.data.MultiplexingFnDataReceiver;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.KV;
/** Executes different components of Combine PTransforms. */
-public class CombineRunners<InputT, OutputT> {
+public class CombineRunners {
/** A registrar which provides a factory to handle combine component
PTransforms. */
@AutoService(PTransformRunnerFactory.Registrar.class)
@@ -41,7 +61,7 @@
public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories()
{
return ImmutableMap.of(
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_PRECOMBINE),
-
MapFnRunners.forValueMapFnFactory(CombineRunners::createPrecombineMapFunction),
+ new PrecombineFactory(),
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS),
MapFnRunners.forValueMapFnFactory(CombineRunners::createMergeAccumulatorsMapFunction),
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS),
@@ -51,17 +71,120 @@
}
}
- static <KeyT, InputT, AccumT>
- ThrowingFunction<KV<KeyT, InputT>, KV<KeyT, AccumT>>
createPrecombineMapFunction(
- String pTransformId, PTransform pTransform) throws IOException {
- CombinePayload combinePayload =
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
- CombineFn<InputT, AccumT, ?> combineFn =
- (CombineFn)
- SerializableUtils.deserializeFromByteArray(
-
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(),
"CombineFn");
+ private static class PrecombineRunner<KeyT, InputT, AccumT> {
+ private PipelineOptions options;
+ private CombineFn<InputT, AccumT, ?> combineFn;
+ private FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>> output;
+ private Coder<KeyT> keyCoder;
+ private GroupingTable<WindowedValue<KeyT>, InputT, AccumT> groupingTable;
+ private Coder<AccumT> accumCoder;
+
+ PrecombineRunner(
+ PipelineOptions options,
+ CombineFn<InputT, AccumT, ?> combineFn,
+ FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>> output,
+ Coder<KeyT> keyCoder,
+ Coder<AccumT> accumCoder) {
+ this.options = options;
+ this.combineFn = combineFn;
+ this.output = output;
+ this.keyCoder = keyCoder;
+ this.accumCoder = accumCoder;
+ }
+
+ void startBundle() {
+ groupingTable =
+ PrecombineGroupingTable.combiningAndSampling(
+ options, combineFn, keyCoder, accumCoder, 0.001
/*sizeEstimatorSampleRate*/);
+ }
+
+ void processElement(WindowedValue<KV<KeyT, InputT>> elem) throws Exception
{
+ groupingTable.put(
+ elem, (Object outputElem) -> output.accept((WindowedValue<KV<KeyT,
AccumT>>) outputElem));
+ }
+
+ void finishBundle() throws Exception {
+ groupingTable.flush(
+ (Object outputElem) -> output.accept((WindowedValue<KV<KeyT,
AccumT>>) outputElem));
+ }
+ }
+
+ /** A factory for {@link PrecombineRunner}s. */
+ @VisibleForTesting
+ public static class PrecombineFactory<KeyT, InputT, AccumT>
+ implements PTransformRunnerFactory<PrecombineRunner<KeyT, InputT,
AccumT>> {
+
+ @Override
+ public PrecombineRunner<KeyT, InputT, AccumT> createRunnerForPTransform(
+ PipelineOptions pipelineOptions,
+ BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
+ String pTransformId,
+ PTransform pTransform,
+ Supplier<String> processBundleInstructionId,
+ Map<String, PCollection> pCollections,
+ Map<String, RunnerApi.Coder> coders,
+ Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
+ Multimap<String, FnDataReceiver<WindowedValue<?>>>
pCollectionIdsToConsumers,
+ Consumer<ThrowingRunnable> addStartFunction,
+ Consumer<ThrowingRunnable> addFinishFunction,
+ BundleSplitListener splitListener)
+ throws IOException {
+ // Get objects needed to create the runner.
+ RehydratedComponents rehydratedComponents =
+ RehydratedComponents.forComponents(
+ RunnerApi.Components.newBuilder()
+ .putAllCoders(coders)
+ .putAllWindowingStrategies(windowingStrategies)
+ .build());
+ String mainInputTag =
Iterables.getOnlyElement(pTransform.getInputsMap().keySet());
+ RunnerApi.PCollection mainInput =
pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
+
+ // Input coder may sometimes be WindowedValueCoder depending on runner,
instead of the
+ // expected KvCoder.
+ Coder<?> uncastInputCoder =
rehydratedComponents.getCoder(mainInput.getCoderId());
+ KvCoder<KeyT, InputT> inputCoder;
+ if (uncastInputCoder instanceof WindowedValueCoder) {
+ inputCoder =
+ (KvCoder<KeyT, InputT>)
+ ((WindowedValueCoder<KV<KeyT, InputT>>)
uncastInputCoder).getValueCoder();
+ } else {
+ inputCoder = (KvCoder<KeyT, InputT>)
rehydratedComponents.getCoder(mainInput.getCoderId());
+ }
+ Coder<KeyT> keyCoder = inputCoder.getKeyCoder();
+
+ CombinePayload combinePayload =
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
+ CombineFn<InputT, AccumT, ?> combineFn =
+ (CombineFn)
+ SerializableUtils.deserializeFromByteArray(
+
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(),
"CombineFn");
+ Coder<AccumT> accumCoder =
+ (Coder<AccumT>)
rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId());
+
+ Collection<FnDataReceiver<WindowedValue<KV<KeyT, AccumT>>>> consumers =
+ (Collection)
+ pCollectionIdsToConsumers.get(
+
Iterables.getOnlyElement(pTransform.getOutputsMap().values()));
- return (KV<KeyT, InputT> input) ->
- KV.of(input.getKey(),
combineFn.addInput(combineFn.createAccumulator(), input.getValue()));
+ // Create the runner.
+ PrecombineRunner<KeyT, InputT, AccumT> runner =
+ new PrecombineRunner<>(
+ pipelineOptions,
+ combineFn,
+ MultiplexingFnDataReceiver.forConsumers(consumers),
+ keyCoder,
+ accumCoder);
+
+ // Register the appropriate handlers.
+ addStartFunction.accept(runner::startBundle);
+ pCollectionIdsToConsumers.put(
+ Iterables.getOnlyElement(pTransform.getInputsMap().values()),
+ (FnDataReceiver)
+ (FnDataReceiver<WindowedValue<KV<KeyT, InputT>>>)
runner::processElement);
+ addFinishFunction.accept(runner::finishBundle);
+
+ return runner;
+ }
}
static <KeyT, AccumT>
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/GroupingTable.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/GroupingTable.java
new file mode 100644
index 00000000000..5fdd46f2be0
--- /dev/null
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/GroupingTable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+/** An interface that groups inputs to an accumulator and flushes the output.
*/
+public interface GroupingTable<K, InputT, AccumT> {
+
+ /** Abstract interface of things that accept inputs one at a time via
process(). */
+ interface Receiver {
+ /** Processes the element. */
+ void process(Object outputElem) throws Exception;
+ }
+
+ /** Adds a pair to this table, possibly flushing some entries to output if
the table is full. */
+ void put(Object pair, Receiver receiver) throws Exception;
+
+ /** Flushes all entries in this table to output. */
+ void flush(Receiver output) throws Exception;
+}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
new file mode 100644
index 00000000000..060b0e5efd4
--- /dev/null
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/** Static utility methods that provide {@link GroupingTable} implementations.
*/
+public class PrecombineGroupingTable<K, InputT, AccumT>
+ implements GroupingTable<K, InputT, AccumT> {
+ /** Returns a {@link GroupingTable} that combines inputs into a accumulator.
*/
+ public static <K, InputT, AccumT> GroupingTable<WindowedValue<K>, InputT,
AccumT> combining(
+ PipelineOptions options,
+ CombineFn<InputT, AccumT, ?> combineFn,
+ Coder<K> keyCoder,
+ Coder<? super AccumT> accumulatorCoder) {
+ Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
+ new ValueCombiner<>(
+ GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ return new PrecombineGroupingTable<>(
+ DEFAULT_MAX_GROUPING_TABLE_BYTES,
+ new WindowingCoderGroupingKeyCreator<>(keyCoder),
+ WindowedPairInfo.create(),
+ valueCombiner,
+ new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+ new CoderSizeEstimator<>(accumulatorCoder));
+ }
+
+ /**
+ * Returns a {@link GroupingTable} that combines inputs into a accumulator
with sampling {@link
+ * SizeEstimator SizeEstimators}.
+ */
+ public static <K, InputT, AccumT>
+ GroupingTable<WindowedValue<K>, InputT, AccumT> combiningAndSampling(
+ PipelineOptions options,
+ CombineFn<InputT, AccumT, ?> combineFn,
+ Coder<K> keyCoder,
+ Coder<? super AccumT> accumulatorCoder,
+ double sizeEstimatorSampleRate) {
+ Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
+ new ValueCombiner<>(
+ GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ return new PrecombineGroupingTable<>(
+ DEFAULT_MAX_GROUPING_TABLE_BYTES,
+ new WindowingCoderGroupingKeyCreator<>(keyCoder),
+ WindowedPairInfo.create(),
+ valueCombiner,
+ new SamplingSizeEstimator<>(
+ new
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+ sizeEstimatorSampleRate,
+ 1.0),
+ new SamplingSizeEstimator<>(
+ new CoderSizeEstimator<>(accumulatorCoder),
sizeEstimatorSampleRate, 1.0));
+ }
+
+ /** Provides client-specific operations for grouping keys. */
+ public interface GroupingKeyCreator<K> {
+ Object createGroupingKey(K key) throws Exception;
+ }
+
+ /** Implements Precombine GroupingKeyCreator via Coder. */
+ public static class WindowingCoderGroupingKeyCreator<K>
+ implements GroupingKeyCreator<WindowedValue<K>> {
+
+ private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ private final Coder<K> coder;
+
+ WindowingCoderGroupingKeyCreator(Coder<K> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public Object createGroupingKey(WindowedValue<K> key) {
+ // Ignore timestamp for grouping purposes.
+ // The Precombine output will inherit the timestamp of one of its inputs.
+ return WindowedValue.of(
+ coder.structuralValue(key.getValue()), ignored, key.getWindows(),
key.getPane());
+ }
+ }
+
+ /** Provides client-specific operations for size estimates. */
+ public interface SizeEstimator<T> {
+ long estimateSize(T element) throws Exception;
+ }
+
+ /** Implements SizeEstimator via Coder. */
+ public static class CoderSizeEstimator<T> implements SizeEstimator<T> {
+ /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
+ private static class Observer extends ElementByteSizeObserver {
+ private long observedSize = 0;
+
+ @Override
+ protected void reportElementSize(long elementSize) {
+ observedSize += elementSize;
+ }
+ }
+
+ final Coder<T> coder;
+
+ CoderSizeEstimator(Coder<T> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public long estimateSize(T value) throws Exception {
+ // First try using byte size observer
+ CoderSizeEstimator.Observer observer = new CoderSizeEstimator.Observer();
+ coder.registerByteSizeObserver(value, observer);
+
+ if (!observer.getIsLazy()) {
+ observer.advance();
+ return observer.observedSize;
+ } else {
+ // Coder byte size observation is lazy (requires iteration for
observation) so fall back to
+ // counting output stream
+ CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
+ coder.encode(value, os);
+ return os.getCount();
+ }
+ }
+ }
+
+ /**
+ * Provides client-specific operations for working with elements that are
key/value or key/values
+ * pairs.
+ */
+ public interface PairInfo {
+ Object getKeyFromInputPair(Object pair);
+
+ Object getValueFromInputPair(Object pair);
+
+ Object makeOutputPair(Object key, Object value);
+ }
+
+ /** Implements Precombine PairInfo via KVs. */
+ public static class WindowedPairInfo implements PairInfo {
+ private static WindowedPairInfo theInstance = new WindowedPairInfo();
+
+ public static WindowedPairInfo create() {
+ return theInstance;
+ }
+
+ private WindowedPairInfo() {}
+
+ @Override
+ public Object getKeyFromInputPair(Object pair) {
+ @SuppressWarnings("unchecked")
+ WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
+ return windowedKv.withValue(windowedKv.getValue().getKey());
+ }
+
+ @Override
+ public Object getValueFromInputPair(Object pair) {
+ @SuppressWarnings("unchecked")
+ WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
+ return windowedKv.getValue().getValue();
+ }
+
+ @Override
+ public Object makeOutputPair(Object key, Object values) {
+ WindowedValue<?> windowedKey = (WindowedValue<?>) key;
+ return windowedKey.withValue(KV.of(windowedKey.getValue(), values));
+ }
+ }
+
+ /** Provides client-specific operations for combining values. */
+ public interface Combiner<K, InputT, AccumT, OutputT> {
+ AccumT createAccumulator(K key);
+
+ AccumT add(K key, AccumT accumulator, InputT value);
+
+ AccumT merge(K key, Iterable<AccumT> accumulators);
+
+ AccumT compact(K key, AccumT accumulator);
+
+ OutputT extract(K key, AccumT accumulator);
+ }
+
+ /** Implements Precombine Combiner via Combine.KeyedCombineFn. */
+ public static class ValueCombiner<K, InputT, AccumT, OutputT>
+ implements Combiner<WindowedValue<K>, InputT, AccumT, OutputT> {
+ private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn;
+ private final SideInputReader sideInputReader;
+ private final PipelineOptions options;
+
+ private ValueCombiner(
+ GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn,
+ SideInputReader sideInputReader,
+ PipelineOptions options) {
+ this.combineFn = combineFn;
+ this.sideInputReader = sideInputReader;
+ this.options = options;
+ }
+
+ @Override
+ public AccumT createAccumulator(WindowedValue<K> windowedKey) {
+ return this.combineFn.createAccumulator(options, sideInputReader,
windowedKey.getWindows());
+ }
+
+ @Override
+ public AccumT add(WindowedValue<K> windowedKey, AccumT accumulator, InputT
value) {
+ return this.combineFn.addInput(
+ accumulator, value, options, sideInputReader,
windowedKey.getWindows());
+ }
+
+ @Override
+ public AccumT merge(WindowedValue<K> windowedKey, Iterable<AccumT>
accumulators) {
+ return this.combineFn.mergeAccumulators(
+ accumulators, options, sideInputReader, windowedKey.getWindows());
+ }
+
+ @Override
+ public AccumT compact(WindowedValue<K> windowedKey, AccumT accumulator) {
+ return this.combineFn.compact(
+ accumulator, options, sideInputReader, windowedKey.getWindows());
+ }
+
+ @Override
+ public OutputT extract(WindowedValue<K> windowedKey, AccumT accumulator) {
+ return this.combineFn.extractOutput(
+ accumulator, options, sideInputReader, windowedKey.getWindows());
+ }
+ }
+
+ // By default, how many bytes we allow the grouping table to consume before
+ // it has to be flushed.
+ private static final long DEFAULT_MAX_GROUPING_TABLE_BYTES = 100_000_000L;
+
+ // How many bytes a word in the JVM has.
+ private static final int BYTES_PER_JVM_WORD = getBytesPerJvmWord();
+ /**
+ * The number of bytes of overhead to store an entry in the grouping table
(a {@code
+ * HashMap<StructuralByteArray, KeyAndValues>}), ignoring the actual number
of bytes in the keys
+ * and values:
+ *
+ * <ul>
+ * <li>an array element (1 word),
+ * <li>a HashMap.Entry (4 words),
+ * <li>a StructuralByteArray (1 words),
+ * <li>a backing array (guessed at 1 word for the length),
+ * <li>a KeyAndValues (2 words),
+ * <li>an ArrayList (2 words),
+ * <li>a backing array (1 word),
+ * <li>per-object overhead (JVM-specific, guessed at 2 words * 6 objects).
+ * </ul>
+ */
+ private static final int PER_KEY_OVERHEAD = 24 * BYTES_PER_JVM_WORD;
+
+ /** A {@link GroupingTable} that uses the given combiner to combine values
in place. */
+ // Keep the table relatively full to increase the chance of collisions.
+ private static final double TARGET_LOAD = 0.9;
+
+ private long maxSize;
+ private final GroupingKeyCreator<? super K> groupingKeyCreator;
+ private final PairInfo pairInfo;
+ private final Combiner<? super K, InputT, AccumT, ?> combiner;
+ private final SizeEstimator<? super K> keySizer;
+ private final SizeEstimator<? super AccumT> accumulatorSizer;
+
+ private long size = 0;
+ private Map<Object, GroupingTableEntry<K, InputT, AccumT>> table;
+
+ PrecombineGroupingTable(
+ long maxSize,
+ GroupingKeyCreator<? super K> groupingKeyCreator,
+ PairInfo pairInfo,
+ Combiner<? super K, InputT, AccumT, ?> combineFn,
+ SizeEstimator<? super K> keySizer,
+ SizeEstimator<? super AccumT> accumulatorSizer) {
+ this.maxSize = maxSize;
+ this.groupingKeyCreator = groupingKeyCreator;
+ this.pairInfo = pairInfo;
+ this.combiner = combineFn;
+ this.keySizer = keySizer;
+ this.accumulatorSizer = accumulatorSizer;
+ this.table = new HashMap<>();
+ }
+
+ interface GroupingTableEntry<K, InputT, AccumT> {
+ K getKey();
+
+ AccumT getValue();
+
+ void add(InputT value) throws Exception;
+
+ long getSize();
+
+ void compact() throws Exception;
+ }
+
+ private GroupingTableEntry<K, InputT, AccumT> createTableEntry(final K key)
throws Exception {
+ return new GroupingTableEntry<K, InputT, AccumT>() {
+ final long keySize = keySizer.estimateSize(key);
+ AccumT accumulator = combiner.createAccumulator(key);
+ long accumulatorSize = 0; // never used before a value is added...
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public AccumT getValue() {
+ return accumulator;
+ }
+
+ @Override
+ public long getSize() {
+ return keySize + accumulatorSize;
+ }
+
+ @Override
+ public void compact() throws Exception {
+ AccumT newAccumulator = combiner.compact(key, accumulator);
+ if (newAccumulator != accumulator) {
+ accumulator = newAccumulator;
+ accumulatorSize = accumulatorSizer.estimateSize(newAccumulator);
+ }
+ }
+
+ @Override
+ public void add(InputT value) throws Exception {
+ accumulator = combiner.add(key, accumulator, value);
+ accumulatorSize = accumulatorSizer.estimateSize(accumulator);
+ }
+ };
+ }
+
+ /** Adds a pair to this table, possibly flushing some entries to output if
the table is full. */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void put(Object pair, Receiver receiver) throws Exception {
+ put(
+ (K) pairInfo.getKeyFromInputPair(pair),
+ (InputT) pairInfo.getValueFromInputPair(pair),
+ receiver);
+ }
+
+ /**
+ * Adds the key and value to this table, possibly flushing some entries to
output if the table is
+ * full.
+ */
+ public void put(K key, InputT value, Receiver receiver) throws Exception {
+ Object groupingKey = groupingKeyCreator.createGroupingKey(key);
+ GroupingTableEntry<K, InputT, AccumT> entry = table.get(groupingKey);
+ if (entry == null) {
+ entry = createTableEntry(key);
+ table.put(groupingKey, entry);
+ size += PER_KEY_OVERHEAD;
+ } else {
+ size -= entry.getSize();
+ }
+ entry.add(value);
+ size += entry.getSize();
+
+ if (size >= maxSize) {
+ long targetSize = (long) (TARGET_LOAD * maxSize);
+ Iterator<GroupingTableEntry<K, InputT, AccumT>> entries =
table.values().iterator();
+ while (size >= targetSize) {
+ if (!entries.hasNext()) {
+ // Should never happen, but sizes may be estimates...
+ size = 0;
+ break;
+ }
+ GroupingTableEntry<K, InputT, AccumT> toFlush = entries.next();
+ entries.remove();
+ size -= toFlush.getSize() + PER_KEY_OVERHEAD;
+ output(toFlush, receiver);
+ }
+ }
+ }
+
+ /**
+ * Output the given entry. Does not actually remove it from the table or
update this table's size.
+ */
+ private void output(GroupingTableEntry<K, InputT, AccumT> entry, Receiver
receiver)
+ throws Exception {
+ entry.compact();
+ receiver.process(pairInfo.makeOutputPair(entry.getKey(),
entry.getValue()));
+ }
+
+ /** Flushes all entries in this table to output. */
+ @Override
+ public void flush(Receiver output) throws Exception {
+ for (GroupingTableEntry<K, InputT, AccumT> entry : table.values()) {
+ output(entry, output);
+ }
+ table.clear();
+ size = 0;
+ }
+
+ @VisibleForTesting
+ public void setMaxSize(long maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ @VisibleForTesting
+ public long size() {
+ return size;
+ }
+
+ /** Returns the number of bytes in a JVM word. In case we failed to find the
answer, returns 8. */
+ private static int getBytesPerJvmWord() {
+ String wordSizeInBits = System.getProperty("sun.arch.data.model");
+ try {
+ return Integer.parseInt(wordSizeInBits) / 8;
+ } catch (NumberFormatException e) {
+ // The JVM word size is unknown. Assume 64-bit.
+ return 8;
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Size sampling.
+
+ /**
+ * Implements size estimation by adaptively delegating to an underlying
(potentially more
+ * expensive) estimator for some elements and returning the average value
for others.
+ */
+ @VisibleForTesting
+ static class SamplingSizeEstimator<T> implements SizeEstimator<T> {
+
+ /**
+ * The degree of confidence required in our expected value predictions
before we allow
+ * under-sampling.
+ *
+ * <p>The value of 3.0 is a confidence interval of about 99.7% for a a
high-degree-of-freedom
+ * t-distribution.
+ */
+ static final double CONFIDENCE_INTERVAL_SIGMA = 3;
+
+ /**
+ * The desired size of our confidence interval (relative to the measured
expected value).
+ *
+ * <p>The value of 0.25 is plus or minus 25%.
+ */
+ static final double CONFIDENCE_INTERVAL_SIZE = 0.25;
+
+ /** Default number of elements that must be measured before elements are
skipped. */
+ static final long DEFAULT_MIN_SAMPLED = 20;
+
+ private final SizeEstimator<T> underlying;
+ private final double minSampleRate;
+ private final double maxSampleRate;
+ private final long minSampled;
+ private final Random random;
+
+ private long totalElements = 0;
+ private long sampledElements = 0;
+ private long sampledSum = 0;
+ private double sampledSumSquares = 0;
+ private long estimate;
+
+ private long nextSample = 0;
+
+ private SamplingSizeEstimator(
+ SizeEstimator<T> underlying, double minSampleRate, double
maxSampleRate) {
+ this(underlying, minSampleRate, maxSampleRate, DEFAULT_MIN_SAMPLED, new
Random());
+ }
+
+ @VisibleForTesting
+ SamplingSizeEstimator(
+ SizeEstimator<T> underlying,
+ double minSampleRate,
+ double maxSampleRate,
+ long minSampled,
+ Random random) {
+ this.underlying = underlying;
+ this.minSampleRate = minSampleRate;
+ this.maxSampleRate = maxSampleRate;
+ this.minSampled = minSampled;
+ this.random = random;
+ }
+
+ @Override
+ public long estimateSize(T element) throws Exception {
+ if (sampleNow()) {
+ return recordSample(underlying.estimateSize(element));
+ } else {
+ return estimate;
+ }
+ }
+
+ private boolean sampleNow() {
+ totalElements++;
+ return --nextSample < 0;
+ }
+
+ private long recordSample(long value) {
+ sampledElements += 1;
+ sampledSum += value;
+ sampledSumSquares += value * value;
+ estimate = (long) Math.ceil((double) sampledSum / sampledElements);
+ long target = desiredSampleSize();
+ if (sampledElements < minSampled || sampledElements < target) {
+ // Sample immediately.
+ nextSample = 0;
+ } else {
+ double rate =
+ cap(
+ minSampleRate,
+ maxSampleRate,
+ Math.max(
+ 1.0 / (totalElements - minSampled + 1), // slowly ramp down
+ target / (double) totalElements)); // "future" target
+ // Uses the geometric distribution to return the likely distance
between
+ // successive independent trials of a fixed probability p. This gives
the
+ // same uniform distribution of branching on Math.random() < p, but
with
+ // one random number generation per success rather than one
+ // per test, which can be a significant savings if p is small.
+ nextSample =
+ rate == 1.0 ? 0 : (long) Math.floor(Math.log(random.nextDouble())
/ Math.log(1 - rate));
+ }
+ return value;
+ }
+
+ private static double cap(double min, double max, double value) {
+ return Math.min(max, Math.max(min, value));
+ }
+
+ private long desiredSampleSize() {
+ // We have no a-priori information on the actual distribution of data
+ // sizes, so compute our desired sample as if it were normal.
+ // Yes this formula is unstable for small stddev, but we only care about
large stddev.
+ double mean = sampledSum / (double) sampledElements;
+ double sumSquareDiff =
+ (sampledSumSquares - (2 * mean * sampledSum) + (sampledElements *
mean * mean));
+ double stddev = Math.sqrt(sumSquareDiff / (sampledElements - 1));
+ double sqrtDesiredSamples =
+ (CONFIDENCE_INTERVAL_SIGMA * stddev) / (CONFIDENCE_INTERVAL_SIZE *
mean);
+ return (long) Math.ceil(sqrtDesiredSamples * sqrtDesiredSamples);
+ }
+ }
+}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
index aac4476e09c..b55acdefaef 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
@@ -93,6 +93,7 @@ public Integer extractOutput(Integer accum) {
private RunnerApi.PTransform pTransform;
private String inputPCollectionId;
private String outputPCollectionId;
+ private RunnerApi.Pipeline pProto;
@Before
public void createPipeline() throws Exception {
@@ -109,7 +110,7 @@ public void createPipeline() throws Exception {
// Create FnApi protos needed for the runner.
SdkComponents sdkComponents = SdkComponents.create();
- RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents);
+ pProto = PipelineTranslation.toProto(p, sdkComponents);
inputPCollectionId = sdkComponents.registerPCollection(inputPCollection);
outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
pTransform = pProto.getComponents().getTransformsOrThrow(TEST_COMBINE_ID);
@@ -133,7 +134,7 @@ public void testPrecombine() throws Exception {
List<ThrowingRunnable> finishFunctions = new ArrayList<>();
// Create runner.
-
MapFnRunners.forValueMapFnFactory(CombineRunners::createPrecombineMapFunction)
+ new CombineRunners.PrecombineFactory<>()
.createRunnerForPTransform(
PipelineOptionsFactory.create(),
null,
@@ -141,16 +142,15 @@ public void testPrecombine() throws Exception {
TEST_COMBINE_ID,
pTransform,
null,
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
+ pProto.getComponents().getPcollectionsMap(),
+ pProto.getComponents().getCodersMap(),
+ pProto.getComponents().getWindowingStrategiesMap(),
consumers,
startFunctions::add,
finishFunctions::add,
null);
- assertThat(startFunctions, empty());
- assertThat(finishFunctions, empty());
+ Iterables.getOnlyElement(startFunctions).run();
// Send elements to runner and check outputs.
mainOutputValues.clear();
@@ -164,18 +164,23 @@ public void testPrecombine() throws Exception {
input.accept(valueInGlobalWindow(KV.of("B", "2")));
input.accept(valueInGlobalWindow(KV.of("C", "3")));
+ Iterables.getOnlyElement(finishFunctions).run();
+
// Check that all values for "A" were converted to accumulators regardless
of how they were
// combined by the Precombine optimization.
Integer sum = 0;
- while ("A".equals(mainOutputValues.getFirst().getValue().getKey())) {
- sum += mainOutputValues.getFirst().getValue().getValue();
- mainOutputValues.removeFirst();
+ for (WindowedValue<KV<String, Integer>> outputValue : mainOutputValues) {
+ if ("A".equals(outputValue.getValue().getKey())) {
+ sum += outputValue.getValue().getValue();
+ }
}
assertThat(sum, equalTo(9));
+ // Check that elements for "B" and "C" are present as well.
+ mainOutputValues.removeIf(elem -> "A".equals(elem.getValue().getKey()));
assertThat(
mainOutputValues,
- contains(valueInGlobalWindow(KV.of("B", 2)),
valueInGlobalWindow(KV.of("C", 3))));
+ containsInAnyOrder(valueInGlobalWindow(KV.of("B", 2)),
valueInGlobalWindow(KV.of("C", 3))));
}
/**
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java
new file mode 100644
index 00000000000..1fd26f071a0
--- /dev/null
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.isIn;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.fn.harness.GroupingTable.Receiver;
+import org.apache.beam.fn.harness.PrecombineGroupingTable.Combiner;
+import org.apache.beam.fn.harness.PrecombineGroupingTable.GroupingKeyCreator;
+import
org.apache.beam.fn.harness.PrecombineGroupingTable.SamplingSizeEstimator;
+import org.apache.beam.fn.harness.PrecombineGroupingTable.SizeEstimator;
+import org.apache.beam.sdk.values.KV;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link PrecombineGroupingTable}. */
+@RunWith(JUnit4.class)
+public class PrecombineGroupingTableTest {
+
+ private static class TestOutputReceiver implements Receiver {
+ final List<Object> outputElems = new ArrayList<>();
+
+ @Override
+ public void process(Object elem) {
+ outputElems.add(elem);
+ }
+ }
+
+ @Test
+ public void testCombiningGroupingTable() throws Exception {
+ Combiner<Object, Integer, Long, Long> summingCombineFn =
+ new Combiner<Object, Integer, Long, Long>() {
+
+ @Override
+ public Long createAccumulator(Object key) {
+ return 0L;
+ }
+
+ @Override
+ public Long add(Object key, Long accumulator, Integer value) {
+ return accumulator + value;
+ }
+
+ @Override
+ public Long merge(Object key, Iterable<Long> accumulators) {
+ long sum = 0;
+ for (Long part : accumulators) {
+ sum += part;
+ }
+ return sum;
+ }
+
+ @Override
+ public Long compact(Object key, Long accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Long extract(Object key, Long accumulator) {
+ return accumulator;
+ }
+ };
+
+ PrecombineGroupingTable<String, Integer, Long> table =
+ new PrecombineGroupingTable<>(
+ 100_000_000L,
+ new IdentityGroupingKeyCreator(),
+ new KvPairInfo(),
+ summingCombineFn,
+ new StringPowerSizeEstimator(),
+ new IdentitySizeEstimator());
+ table.setMaxSize(1000);
+
+ TestOutputReceiver receiver = new TestOutputReceiver();
+
+ table.put("A", 1, receiver);
+ table.put("B", 2, receiver);
+ table.put("B", 3, receiver);
+ table.put("C", 4, receiver);
+ assertThat(receiver.outputElems, empty());
+
+ table.put("C", 5000, receiver);
+ assertThat(receiver.outputElems, hasItem((Object) KV.of("C", 5004L)));
+
+ table.put("DDDD", 6, receiver);
+ assertThat(receiver.outputElems, hasItem((Object) KV.of("DDDD", 6L)));
+
+ table.flush(receiver);
+ assertThat(
+ receiver.outputElems,
+ IsIterableContainingInAnyOrder.containsInAnyOrder(
+ KV.of("A", 1L), KV.of("B", 2L + 3), KV.of("C", 5000L + 4),
KV.of("DDDD", 6L)));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Tests for the sampling size estimator.
+
+ @Test
+ public void testSampleFlatSizes() throws Exception {
+ IdentitySizeEstimator underlying = new IdentitySizeEstimator();
+ SizeEstimator<Long> estimator =
+ new SamplingSizeEstimator<>(underlying, 0.05, 1.0, 10, new Random(1));
+ // First 10 elements are always sampled.
+ for (int k = 0; k < 10; k++) {
+ assertEquals(100, estimator.estimateSize(100L));
+ assertEquals(k + 1, underlying.calls);
+ }
+ // Next 10 are sometimes sampled.
+ for (int k = 10; k < 20; k++) {
+ assertEquals(100, estimator.estimateSize(100L));
+ }
+ assertThat(underlying.calls, between(11, 19));
+ int initialCalls = underlying.calls;
+ // Next 1000 are sampled at about 5%.
+ for (int k = 20; k < 1020; k++) {
+ assertEquals(100, estimator.estimateSize(100L));
+ }
+ assertThat(underlying.calls - initialCalls, between(40, 60));
+ }
+
+ @Test
+ public void testSampleBoringSizes() throws Exception {
+ IdentitySizeEstimator underlying = new IdentitySizeEstimator();
+ SizeEstimator<Long> estimator =
+ new SamplingSizeEstimator<>(underlying, 0.05, 1.0, 10, new Random(1));
+ // First 10 elements are always sampled.
+ for (int k = 0; k < 10; k += 2) {
+ assertEquals(100, estimator.estimateSize(100L));
+ assertEquals(102, estimator.estimateSize(102L));
+ assertEquals(k + 2, underlying.calls);
+ }
+ // Next 10 are sometimes sampled.
+ for (int k = 10; k < 20; k += 2) {
+ assertThat(estimator.estimateSize(100L), between(100L, 102L));
+ assertThat(estimator.estimateSize(102L), between(100L, 102L));
+ }
+ assertThat(underlying.calls, between(11, 19));
+ int initialCalls = underlying.calls;
+ // Next 1000 are sampled at about 5%.
+ for (int k = 20; k < 1020; k += 2) {
+ assertThat(estimator.estimateSize(100L), between(100L, 102L));
+ assertThat(estimator.estimateSize(102L), between(100L, 102L));
+ }
+ assertThat(underlying.calls - initialCalls, between(40, 60));
+ }
+
+ @Test
+ public void testSampleHighVarianceSizes() throws Exception {
+ // The largest element is much larger than the average.
+ List<Long> sizes = Arrays.asList(1L, 10L, 100L, 1000L);
+ IdentitySizeEstimator underlying = new IdentitySizeEstimator();
+ SizeEstimator<Long> estimator =
+ new SamplingSizeEstimator<>(underlying, 0.1, 0.2, 10, new Random(1));
+ // First 10 elements are always sampled.
+ for (int k = 0; k < 10; k++) {
+ long size = sizes.get(k % sizes.size());
+ assertEquals(size, estimator.estimateSize(size));
+ assertEquals(k + 1, underlying.calls);
+ }
+ // We're still not out of the woods; sample every element.
+ for (int k = 10; k < 20; k++) {
+ long size = sizes.get(k % sizes.size());
+ assertEquals(size, estimator.estimateSize(size));
+ assertEquals(k + 1, underlying.calls);
+ }
+ // Sample some more to let things settle down.
+ for (int k = 20; k < 500; k++) {
+ estimator.estimateSize(sizes.get(k % sizes.size()));
+ }
+ // Next 1000 are sampled at about 20% (maxSampleRate).
+ int initialCalls = underlying.calls;
+ for (int k = 500; k < 1500; k++) {
+ long size = sizes.get(k % sizes.size());
+ assertThat(estimator.estimateSize(size), anyOf(isIn(sizes),
between(250L, 350L)));
+ }
+ assertThat(underlying.calls - initialCalls, between(180, 220));
+ // Sample some more to let things settle down.
+ for (int k = 1500; k < 3000; k++) {
+ estimator.estimateSize(sizes.get(k % sizes.size()));
+ }
+ // Next 1000 are sampled at about 10% (minSampleRate).
+ initialCalls = underlying.calls;
+ for (int k = 3000; k < 4000; k++) {
+ long size = sizes.get(k % sizes.size());
+ assertThat(estimator.estimateSize(size), anyOf(isIn(sizes),
between(250L, 350L)));
+ }
+ assertThat(underlying.calls - initialCalls, between(90, 110));
+ }
+
+ @Test
+ public void testSampleChangingSizes() throws Exception {
+ IdentitySizeEstimator underlying = new IdentitySizeEstimator();
+ SizeEstimator<Long> estimator =
+ new SamplingSizeEstimator<>(underlying, 0.05, 1.0, 10, new Random(1));
+ // First 10 elements are always sampled.
+ for (int k = 0; k < 10; k++) {
+ assertEquals(100, estimator.estimateSize(100L));
+ assertEquals(k + 1, underlying.calls);
+ }
+ // Next 10 are sometimes sampled.
+ for (int k = 10; k < 20; k++) {
+ assertEquals(100, estimator.estimateSize(100L));
+ }
+ assertThat(underlying.calls, between(11, 19));
+ int initialCalls = underlying.calls;
+ // Next 1000 are sampled at about 5%.
+ for (int k = 20; k < 1020; k++) {
+ assertEquals(100, estimator.estimateSize(100L));
+ }
+ assertThat(underlying.calls - initialCalls, between(40, 60));
+ // Inject a big element until it is sampled.
+ while (estimator.estimateSize(1000000L) == 100) {}
+ // Check that we have started sampling more regularly again.
+ assertEquals(99, estimator.estimateSize(99L));
+ }
+
+ private static <T extends Comparable<T>> TypeSafeDiagnosingMatcher<T>
between(
+ final T min, final T max) {
+ return new TypeSafeDiagnosingMatcher<T>() {
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("is between " + min + " and " + max);
+ }
+
+ @Override
+ protected boolean matchesSafely(T item, Description mismatchDescription)
{
+ return min.compareTo(item) <= 0 && item.compareTo(max) <= 0;
+ }
+ };
+ }
+
+ /** Return the key as its grouping key. */
+ private static class IdentityGroupingKeyCreator implements
GroupingKeyCreator<Object> {
+ @Override
+ public Object createGroupingKey(Object key) {
+ return key;
+ }
+ }
+
+ /** "Estimate" the size of longs by looking at their value. */
+ private static class IdentitySizeEstimator implements SizeEstimator<Long> {
+ int calls = 0;
+
+ @Override
+ public long estimateSize(Long element) {
+ calls++;
+ return element;
+ }
+ }
+
+ /** "Estimate" the size of strings by taking the tenth power of their
length. */
+ private static class StringPowerSizeEstimator implements
SizeEstimator<String> {
+ @Override
+ public long estimateSize(String element) {
+ return (long) Math.pow(10, element.length());
+ }
+ }
+
+ private static class KvPairInfo implements PrecombineGroupingTable.PairInfo {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object getKeyFromInputPair(Object pair) {
+ return ((KV<Object, ?>) pair).getKey();
+ }
+
+ @Override
+ public Object getValueFromInputPair(Object pair) {
+ return ((KV<?, ?>) pair).getValue();
+ }
+
+ @Override
+ public Object makeOutputPair(Object key, Object value) {
+ return KV.of(key, value);
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 118339)
Time Spent: 5h 50m (was: 5h 40m)
> Implement the portable lifted Combiner transforms in Java SDK
> -------------------------------------------------------------
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core, sdk-java-harness
> Reporter: Daniel Oliveira
> Assignee: Daniel Oliveira
> Priority: Major
> Labels: portability
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These
> parts need to be implemented in the Java SDK harness so that the SDK can
> actually execute them when receiving Combine transforms with the
> corresponding URNs.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)