[
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118320&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118320
]
ASF GitHub Bot logged work on BEAM-3708:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m
Work Description: lukecwik commented on a change in pull request #5795:
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199304718
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
##########
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/** Static utility methods that provide {@link GroupingTable} implementations.
*/
+public class PrecombineGroupingTable<K, InputT, AccumT>
+ implements GroupingTable<K, InputT, AccumT> {
+ /** Returns a {@link GroupingTable} that combines inputs into a accumulator.
*/
+ public static <K, InputT, AccumT> GroupingTable<WindowedValue<K>, InputT,
AccumT> combining(
+ PipelineOptions options,
+ CombineFn<InputT, AccumT, ?> combineFn,
+ Coder<K> keyCoder,
+ Coder<? super AccumT> accumulatorCoder) {
+ Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
+ new ValueCombiner<>(
+ GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ return new PrecombineGroupingTable<>(
+ DEFAULT_MAX_GROUPING_TABLE_BYTES,
+ new WindowingCoderGroupingKeyCreator<>(keyCoder),
+ WindowedPairInfo.create(),
+ valueCombiner,
+ new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+ new CoderSizeEstimator<>(accumulatorCoder));
+ }
+
+ /**
+ * Returns a {@link GroupingTable} that combines inputs into a accumulator
with sampling {@link
+ * SizeEstimator SizeEstimators}.
+ */
+ public static <K, InputT, AccumT>
+ GroupingTable<WindowedValue<K>, InputT, AccumT> combiningAndSampling(
+ PipelineOptions options,
+ CombineFn<InputT, AccumT, ?> combineFn,
+ Coder<K> keyCoder,
+ Coder<? super AccumT> accumulatorCoder,
+ double sizeEstimatorSampleRate) {
+ Combiner<WindowedValue<K>, InputT, AccumT, ?> valueCombiner =
+ new ValueCombiner<>(
+ GlobalCombineFnRunners.create(combineFn),
NullSideInputReader.empty(), options);
+ return new PrecombineGroupingTable<>(
+ DEFAULT_MAX_GROUPING_TABLE_BYTES,
+ new WindowingCoderGroupingKeyCreator<>(keyCoder),
+ WindowedPairInfo.create(),
+ valueCombiner,
+ new SamplingSizeEstimator<>(
+ new
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+ sizeEstimatorSampleRate,
+ 1.0),
+ new SamplingSizeEstimator<>(
+ new CoderSizeEstimator<>(accumulatorCoder),
sizeEstimatorSampleRate, 1.0));
+ }
+
+ /** Provides client-specific operations for grouping keys. */
+ public interface GroupingKeyCreator<K> {
+ Object createGroupingKey(K key) throws Exception;
+ }
+
+ /** Implements Precombine GroupingKeyCreator via Coder. */
+ public static class WindowingCoderGroupingKeyCreator<K>
+ implements GroupingKeyCreator<WindowedValue<K>> {
+
+ private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ private final Coder<K> coder;
+
+ WindowingCoderGroupingKeyCreator(Coder<K> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public Object createGroupingKey(WindowedValue<K> key) {
+ // Ignore timestamp for grouping purposes.
+ // The Precombine output will inherit the timestamp of one of its inputs.
+ return WindowedValue.of(
+ coder.structuralValue(key.getValue()), ignored, key.getWindows(),
key.getPane());
+ }
+ }
+
+ /** Provides client-specific operations for size estimates. */
+ public interface SizeEstimator<T> {
+ long estimateSize(T element) throws Exception;
+ }
+
+ /** Implements SizeEstimator via Coder. */
+ public static class CoderSizeEstimator<T> implements SizeEstimator<T> {
+ /** Basic implementation of {@link ElementByteSizeObserver} for use in
size estimation. */
+ private static class Observer extends ElementByteSizeObserver {
+ private long observedSize = 0;
+
+ @Override
+ protected void reportElementSize(long elementSize) {
+ observedSize += elementSize;
+ }
+ }
+
+ final Coder<T> coder;
+
+ CoderSizeEstimator(Coder<T> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public long estimateSize(T value) throws Exception {
+ // First try using byte size observer
+ CoderSizeEstimator.Observer observer = new CoderSizeEstimator.Observer();
+ coder.registerByteSizeObserver(value, observer);
+
+ if (!observer.getIsLazy()) {
+ observer.advance();
+ return observer.observedSize;
+ } else {
+ // Coder byte size observation is lazy (requires iteration for
observation) so fall back to
+ // counting output stream
+ CountingOutputStream os = new
CountingOutputStream(ByteStreams.nullOutputStream());
+ coder.encode(value, os);
+ return os.getCount();
+ }
+ }
+ }
+
+ /**
+ * Provides client-specific operations for working with elements that are
key/value or key/values
+ * pairs.
+ */
+ public interface PairInfo {
+ Object getKeyFromInputPair(Object pair);
+
+ Object getValueFromInputPair(Object pair);
+
+ Object makeOutputPair(Object key, Object value);
+ }
+
+ /** Implements Precombine PairInfo via KVs. */
+ public static class WindowedPairInfo implements PairInfo {
+ private static WindowedPairInfo theInstance = new WindowedPairInfo();
+
+ public static WindowedPairInfo create() {
+ return theInstance;
+ }
+
+ private WindowedPairInfo() {}
+
+ @Override
+ public Object getKeyFromInputPair(Object pair) {
+ @SuppressWarnings("unchecked")
+ WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
+ return windowedKv.withValue(windowedKv.getValue().getKey());
+ }
+
+ @Override
+ public Object getValueFromInputPair(Object pair) {
+ @SuppressWarnings("unchecked")
+ WindowedValue<KV<?, ?>> windowedKv = (WindowedValue<KV<?, ?>>) pair;
+ return windowedKv.getValue().getValue();
+ }
+
+ @Override
+ public Object makeOutputPair(Object key, Object values) {
+ WindowedValue<?> windowedKey = (WindowedValue<?>) key;
+ return windowedKey.withValue(KV.of(windowedKey.getValue(), values));
+ }
+ }
+
+ /** Provides client-specific operations for combining values. */
+ public interface Combiner<K, InputT, AccumT, OutputT> {
Review comment:
You'll only have one implementation of a `Combiner`. You should be able to
use the `CombineFn` directly everywhere.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 118320)
Time Spent: 5.5h (was: 5h 20m)
> Implement the portable lifted Combiner transforms in Java SDK
> -------------------------------------------------------------
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core, sdk-java-harness
> Reporter: Daniel Oliveira
> Assignee: Daniel Oliveira
> Priority: Major
> Labels: portability
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These
> parts need to be implemented in the Java SDK harness so that the SDK can
> actually execute them when receiving Combine transforms with the
> corresponding URNs.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)