[
https://issues.apache.org/jira/browse/BEAM-5149?focusedWorklogId=142850&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142850
]
ASF GitHub Bot logged work on BEAM-5149:
----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Sep/18 18:23
Start Date: 10/Sep/18 18:23
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #6222: [BEAM-5149] Add
support for the Java SDK harness to merge windows.
URL: https://github.com/apache/beam/pull/6222
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto
b/model/pipeline/src/main/proto/beam_runner_api.proto
index c6ee2656b06..0df8cc8e888 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -202,9 +202,35 @@ message StandardPTransforms {
// Payload: TestStreamPayload
TEST_STREAM = 5 [(beam_urn) = "urn:beam:transform:teststream:v1"];
- // Represents mapping of main input window into side input window.
- // Payload: serialized WindowMappingFn.
+ // Represents mapping of main input window onto side input window.
+ //
+ // Side input window mapping function:
+ // Input: KV<nonce, MainInputWindow>
+ // Output: KV<nonce, SideInputWindow>
+ //
+ // For each main input window, the side input window is returned. The
+ // nonce is used by a runner to associate each input with its output.
+ // The nonce is represented as an opaque set of bytes.
+ //
+ // Payload: WindowMappingFn from SideInputSpec.
MAP_WINDOWS = 6 [(beam_urn) = "beam:transform:map_windows:v1"];
+
+ // Used to merge windows during a GroupByKey.
+ //
+ // Window merging function:
+ // Input: KV<nonce, iterable<OriginalWindow>>
+ // Output: KV<nonce, KV<iterable<UnmergedOriginalWindow>,
iterable<KV<MergedWindow, iterable<ConsumedOriginalWindow>>>>
+ //
+ // For each set of original windows, a list of all unmerged windows is
+ // output alongside a map of merged window to set of consumed windows.
+ // All original windows must be contained in either the unmerged original
+ // window set or one of the consumed original window sets. Each original
+ // window can only be part of one output set. The nonce is used by a runner
+ // to associate each input with its output. The nonce is represented as an
+ // opaque set of bytes.
+ //
+ // Payload: WindowFn from WindowingStrategy.
+ MERGE_WINDOWS = 7 [(beam_urn) = "beam:transform:merge_windows:v1"];
}
enum DeprecatedPrimitives {
// Represents the operation to read a Bounded or Unbounded source.
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index e6c36a2dd42..8bece334982 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -199,6 +199,13 @@ public static TimestampCombiner
timestampCombinerFromProto(RunnerApi.OutputTime.
// This URN says that the WindowFn is just a UDF blob the Java SDK
understands
// TODO: standardize such things
public static final String SERIALIZED_JAVA_WINDOWFN_URN =
"beam:windowfn:javasdk:v0.1";
+ public static final String GLOBAL_WINDOWS_URN =
+ BeamUrns.getUrn(GlobalWindowsPayload.Enum.PROPERTIES);
+ public static final String FIXED_WINDOWS_URN =
+ BeamUrns.getUrn(FixedWindowsPayload.Enum.PROPERTIES);
+ public static final String SLIDING_WINDOWS_URN =
+ BeamUrns.getUrn(SlidingWindowsPayload.Enum.PROPERTIES);
+ public static final String SESSION_WINDOWS_URN =
BeamUrns.getUrn(SessionsPayload.Enum.PROPERTIES);
/**
* Converts a {@link WindowFn} into a {@link
RunnerApi.MessageWithComponents} where {@link
@@ -210,7 +217,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?>
windowFn, SdkComponents com
if (windowFn instanceof GlobalWindows) {
return SdkFunctionSpec.newBuilder()
.setEnvironmentId(components.getOnlyEnvironmentId())
-
.setSpec(FunctionSpec.newBuilder().setUrn(getUrn(GlobalWindowsPayload.Enum.PROPERTIES)))
+ .setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_URN))
.build();
} else if (windowFn instanceof FixedWindows) {
FixedWindowsPayload fixedWindowsPayload =
@@ -222,7 +229,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?>
windowFn, SdkComponents com
.setEnvironmentId(components.getOnlyEnvironmentId())
.setSpec(
FunctionSpec.newBuilder()
- .setUrn(getUrn(FixedWindowsPayload.Enum.PROPERTIES))
+ .setUrn(FIXED_WINDOWS_URN)
.setPayload(fixedWindowsPayload.toByteString()))
.build();
} else if (windowFn instanceof SlidingWindows) {
@@ -236,7 +243,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?>
windowFn, SdkComponents com
.setEnvironmentId(components.getOnlyEnvironmentId())
.setSpec(
FunctionSpec.newBuilder()
- .setUrn(getUrn(SlidingWindowsPayload.Enum.PROPERTIES))
+ .setUrn(SLIDING_WINDOWS_URN)
.setPayload(slidingWindowsPayload.toByteString()))
.build();
} else if (windowFn instanceof Sessions) {
@@ -248,7 +255,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?>
windowFn, SdkComponents com
.setEnvironmentId(components.getOnlyEnvironmentId())
.setSpec(
FunctionSpec.newBuilder()
- .setUrn(getUrn(SessionsPayload.Enum.PROPERTIES))
+ .setUrn(SESSION_WINDOWS_URN)
.setPayload(sessionsPayload.toByteString()))
.build();
} else {
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
index c43680b0d96..3bee525f1cd 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
@@ -32,9 +32,17 @@
import org.apache.beam.sdk.values.KV;
/**
- * Maps windows using a window mapping fn. The input is {@link KV} with the
key being a nonce and
- * the value being a window, the output must be a {@link KV} with the key
being the same nonce as
- * the input and the value being the mapped window.
+ * Represents mapping of main input window onto side input window.
+ *
+ * <p>Side input window mapping function:
+ *
+ * <ul>
+ * <li>Input: {@code KV<nonce, MainInputWindow>}
+ * <li>Output: {@code KV<nonce, SideInputWindow>}
+ * </ul>
+ *
+ * <p>For each main input window, the side input window is returned. The nonce
is used by a runner
+ * to associate each input with its output. The nonce is represented as an
opaque set of bytes.
*/
public class WindowMappingFnRunner {
static final String URN =
BeamUrns.getUrn(StandardPTransforms.Primitives.MAP_WINDOWS);
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
new file mode 100644
index 00000000000..261a9794836
--- /dev/null
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Merges windows using a {@link
org.apache.beam.sdk.transforms.windowing.WindowFn}.
+ *
+ * <p>Window merging function:
+ *
+ * <ul>
+ * <li>Input: {@code KV<nonce, iterable<OriginalWindow>>}
+ * <li>Output: {@code KV<nonce, KV<iterable<UnmergedOriginalWindow>,
iterable<KV<MergedWindow,
+ * iterable<ConsumedOriginalWindow>>>>}
+ * </ul>
+ *
+ * <p>For each set of original windows, a list of all unmerged windows is
output alongside a map of
+ * merged window to set of consumed windows. All original windows must be
contained in either the
+ * unmerged original window set or one of the consumed original window sets.
Each original window
+ * can only be part of one output set. The nonce is used by a runner to
associate each input with
+ * its output. The nonce is represented as an opaque set of bytes.
+ */
+public abstract class WindowMergingFnRunner<T, W extends BoundedWindow> {
+ static final String URN =
BeamUrns.getUrn(StandardPTransforms.Primitives.MERGE_WINDOWS);
+
+ /**
+ * A registrar which provides a factory to handle merging windows based upon
the {@link WindowFn}.
+ */
+ @AutoService(PTransformRunnerFactory.Registrar.class)
+ public static class Registrar implements PTransformRunnerFactory.Registrar {
+
+ @Override
+ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories()
{
+ return ImmutableMap.of(
+ URN,
+
MapFnRunners.forValueMapFnFactory(WindowMergingFnRunner::createMapFunctionForPTransform));
+ }
+ }
+
+ static <T, W extends BoundedWindow>
+ ThrowingFunction<KV<T, Iterable<W>>, KV<T, KV<Iterable<W>,
Iterable<KV<W, Iterable<W>>>>>>
+ createMapFunctionForPTransform(String ptransformId, PTransform
ptransform)
+ throws IOException {
+ RunnerApi.SdkFunctionSpec payload =
+ RunnerApi.SdkFunctionSpec.parseFrom(ptransform.getSpec().getPayload());
+
+ WindowFn<?, W> windowFn =
+ (WindowFn<?, W>)
WindowingStrategyTranslation.windowFnFromProto(payload);
+ return WindowMergingFnRunner.<T, W>create(windowFn)::mergeWindows;
+ }
+
+ static <T, W extends BoundedWindow> WindowMergingFnRunner<T, W>
create(WindowFn<?, W> windowFn) {
+ if (windowFn.isNonMerging()) {
+ return new NonMergingWindowFnRunner();
+ } else {
+ return new MergingViaWindowFnRunner(windowFn);
+ }
+ }
+
+ /**
+ * Returns the set of unmerged windows and a mapping from merged windows to
sets of original
+ * windows.
+ */
+ abstract KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(
+ KV<T, Iterable<W>> windowsToMerge) throws Exception;
+
+
/////////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * An optimized version of window merging where the {@link WindowFn} does
not do any window
+ * merging.
+ *
+ * <p>Note that this is likely to never be invoked and the identity mapping
will be handled
+ * directly by runners. We have this here because runners may not perform
this optimization.
+ */
+ private static class NonMergingWindowFnRunner<T, W extends BoundedWindow>
+ extends WindowMergingFnRunner<T, W> {
+ @Override
+ KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(
+ KV<T, Iterable<W>> windowsToMerge) {
+ return KV.of(
+ windowsToMerge.getKey(), KV.of(windowsToMerge.getValue(),
Collections.emptyList()));
+ }
+ }
+
+ /** An implementation which uses a {@link WindowFn} to merge windows. */
+ private static class MergingViaWindowFnRunner<T, W extends BoundedWindow>
+ extends WindowMergingFnRunner<T, W> {
+ private final WindowFn<?, W> windowFn;
+ private final WindowFn<?, W>.MergeContext mergeContext;
+ private Collection<W> currentWindows;
+ private List<KV<W, Collection<W>>> mergedWindows;
+
+ private MergingViaWindowFnRunner(WindowFn<?, W> windowFn) {
+ this.windowFn = windowFn;
+ this.mergedWindows = new ArrayList<>();
+ this.currentWindows = new ArrayList<>();
+ this.mergeContext =
+ windowFn.new MergeContext() {
+
+ @Override
+ public Collection<W> windows() {
+ return currentWindows;
+ }
+
+ @Override
+ public void merge(Collection<W> toBeMerged, W mergeResult) throws
Exception {
+ mergedWindows.add(KV.of(mergeResult, toBeMerged));
+ }
+ };
+ }
+
+ @Override
+ KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(
+ KV<T, Iterable<W>> windowsToMerge) throws Exception {
+ currentWindows = Sets.newHashSet(windowsToMerge.getValue());
+ windowFn.mergeWindows((MergeContext) mergeContext);
+ for (KV<W, Collection<W>> mergedWindow : mergedWindows) {
+ currentWindows.removeAll(mergedWindow.getValue());
+ }
+ return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable)
mergedWindows));
+ }
+ }
+}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
new file mode 100644
index 00000000000..11220efa20a
--- /dev/null
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link WindowMergingFnRunner}. */
+@RunWith(JUnit4.class)
+public class WindowMergingFnRunnerTest {
+ @Test
+ public void testWindowMergingWithNonMergingWindowFn() throws Exception {
+ ThrowingFunction<
+ KV<Object, Iterable<BoundedWindow>>,
+ KV<
+ Object,
+ KV<Iterable<BoundedWindow>, Iterable<KV<BoundedWindow,
Iterable<BoundedWindow>>>>>>
+ mapFunction =
+ WindowMergingFnRunner.createMapFunctionForPTransform(
+ "ptransformId", createMergeTransformForWindowFn(new
GlobalWindows()));
+
+ KV<Object, Iterable<BoundedWindow>> input =
+ KV.of(
+ "abc",
+ ImmutableList.of(new IntervalWindow(Instant.now(),
Duration.standardMinutes(1))));
+
+ assertEquals(
+ KV.of(input.getKey(), KV.of(input.getValue(),
Collections.emptyList())),
+ mapFunction.apply(input));
+ }
+
+ @Test
+ public void testWindowMergingWithMergingWindowFn() throws Exception {
+ ThrowingFunction<
+ KV<Object, Iterable<BoundedWindow>>,
+ KV<
+ Object,
+ KV<Iterable<BoundedWindow>, Iterable<KV<BoundedWindow,
Iterable<BoundedWindow>>>>>>
+ mapFunction =
+ WindowMergingFnRunner.createMapFunctionForPTransform(
+ "ptransformId",
+
createMergeTransformForWindowFn(Sessions.withGapDuration(Duration.millis(5L))));
+
+ // 7, 8 and 10 should all be merged. 1 and 20 should remain in the
original set.
+ BoundedWindow[] expectedToBeMerged =
+ new BoundedWindow[] {
+ new IntervalWindow(new Instant(9L), new Instant(11L)),
+ new IntervalWindow(new Instant(10L), new Instant(10L)),
+ new IntervalWindow(new Instant(7L), new Instant(10L))
+ };
+ Iterable<BoundedWindow> expectedToBeUnmerged =
+ Sets.newHashSet(
+ new IntervalWindow(new Instant(1L), new Instant(1L)),
+ new IntervalWindow(new Instant(20L), new Instant(20L)));
+ KV<Object, Iterable<BoundedWindow>> input =
+ KV.of(
+ "abc",
+ ImmutableList.<BoundedWindow>builder()
+ .add(expectedToBeMerged)
+ .addAll(expectedToBeUnmerged)
+ .build());
+
+ KV<Object, KV<Iterable<BoundedWindow>, Iterable<KV<BoundedWindow,
Iterable<BoundedWindow>>>>>
+ output = mapFunction.apply(input);
+ assertEquals(input.getKey(), output.getKey());
+ assertEquals(expectedToBeUnmerged, output.getValue().getKey());
+ KV<BoundedWindow, Iterable<BoundedWindow>> mergedOutput =
+ Iterables.getOnlyElement(output.getValue().getValue());
+ assertEquals(new IntervalWindow(new Instant(7L), new Instant(11L)),
mergedOutput.getKey());
+ assertThat(mergedOutput.getValue(),
containsInAnyOrder(expectedToBeMerged));
+ }
+
+ private static <W extends BoundedWindow> RunnerApi.PTransform
createMergeTransformForWindowFn(
+ WindowFn<?, W> windowFn) throws Exception {
+ SdkComponents components = SdkComponents.create();
+
components.registerEnvironment(Environment.newBuilder().setUrl("java").build());
+ RunnerApi.FunctionSpec functionSpec =
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(WindowMergingFnRunner.URN)
+ .setPayload(WindowingStrategyTranslation.toProto(windowFn,
components).toByteString())
+ .build();
+ return RunnerApi.PTransform.newBuilder().setSpec(functionSpec).build();
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 142850)
Time Spent: 1h 50m (was: 1h 40m)
> Add support to the Java SDK harness to merge windows
> ----------------------------------------------------
>
> Key: BEAM-5149
> URL: https://issues.apache.org/jira/browse/BEAM-5149
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-harness
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: Major
> Labels: portability
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Window merging function:
>
> {code:java}
> Input: KV<nonce, iterable<OriginalWindow>>
> Output: KV<nonce, KV<iterable<UnmergedOriginalWindow>,
> iterable<KV<MergedWindow, iterable<ConsumedOriginalWindow>>>>
> {code}
> For each set of original windows, a list of all unmerged windows is output
> alongside a map of merged window to set of consumed windows. All original
> windows must be contained in either the unmerged original window set or one
> of the consumed original window sets. Each original window can only be part
> of one output set. The nonce is used by a runner to associate each input with
> its output. The nonce is represented as an opaque set of bytes.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)