Repository: beam Updated Branches: refs/heads/master 465f4385b -> 3c10c0bc8
Add WindowIntoTranslator This translates Window.Assign into a WindowIntoPayload. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bb3db2ac Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bb3db2ac Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bb3db2ac Branch: refs/heads/master Commit: bb3db2acc07a61844a551ea9f793a0fcf4001878 Parents: 64cea06 Author: Thomas Groh <[email protected]> Authored: Wed May 17 15:08:50 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon May 22 18:37:26 2017 -0700 ---------------------------------------------------------------------- .../core/construction/WindowIntoTranslator.java | 61 +++++++++ .../core/construction/WindowingStrategies.java | 33 ++--- .../construction/WindowIntoTranslatorTest.java | 126 +++++++++++++++++++ 3 files changed, 205 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bb3db2ac/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java new file mode 100644 index 0000000..ea4c996 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; + +/** + * Utility methods for translating a {@link Window.Assign} to and from {@link RunnerApi} + * representations. + */ +public class WindowIntoTranslator { + + static class WindowAssignTranslator implements TransformPayloadTranslator<Window.Assign<?>> { + @Override + public FunctionSpec translate( + AppliedPTransform<?, ?, Window.Assign<?>> transform, SdkComponents components) { + return FunctionSpec.newBuilder() + .setUrn("urn:beam:transform:window:v1") + .setParameter( + Any.pack(WindowIntoTranslator.toProto(transform.getTransform(), components))) + .build(); + } + } + + public static WindowIntoPayload toProto(Window.Assign<?> transform, SdkComponents components) { + return WindowIntoPayload.newBuilder() + .setWindowFn(WindowingStrategies.toProto(transform.getWindowFn(), components)) + .build(); + } + + public static WindowFn<?, ?> getWindowFn(WindowIntoPayload payload) + throws InvalidProtocolBufferException { + SdkFunctionSpec spec = payload.getWindowFn(); + return WindowingStrategies.windowFnFromProto(spec); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bb3db2ac/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java index 395702f..8dceebb 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java @@ -163,8 +163,7 @@ public class WindowingStrategies implements Serializable { * input {@link WindowFn}. */ public static SdkFunctionSpec toProto( - WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) - throws IOException { + WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) { return SdkFunctionSpec.newBuilder() // TODO: Set environment ID .setSpec( @@ -245,7 +244,23 @@ public class WindowingStrategies implements Serializable { throws InvalidProtocolBufferException { SdkFunctionSpec windowFnSpec = proto.getWindowFn(); + WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec); + TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime()); + AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode()); + Trigger trigger = Triggers.fromProto(proto.getTrigger()); + ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior()); + Duration allowedLateness = Duration.millis(proto.getAllowedLateness()); + + return WindowingStrategy.of(windowFn) + .withAllowedLateness(allowedLateness) + .withMode(accumulationMode) + .withTrigger(trigger) + .withTimestampCombiner(timestampCombiner) + .withClosingBehavior(closingBehavior); + } + public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) + throws InvalidProtocolBufferException { checkArgument( windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN), "Only Java-serialized %s instances are supported, with URN %s. But found URN %s", @@ -258,18 +273,6 @@ public class WindowingStrategies implements Serializable { windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(), "WindowFn"); - WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn; - TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime()); - AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode()); - Trigger trigger = Triggers.fromProto(proto.getTrigger()); - ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior()); - Duration allowedLateness = Duration.millis(proto.getAllowedLateness()); - - return WindowingStrategy.of(windowFn) - .withAllowedLateness(allowedLateness) - .withMode(accumulationMode) - .withTrigger(trigger) - .withTimestampCombiner(timestampCombiner) - .withClosingBehavior(closingBehavior); + return (WindowFn<?, ?>) deserializedWindowFn; } } http://git-wip-us.apache.org/repos/asf/beam/blob/bb3db2ac/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslatorTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslatorTest.java new file mode 100644 index 0000000..fbac565 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslatorTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkState; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.Window.Assign; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests for {@link WindowIntoTranslator}. + */ +@RunWith(Parameterized.class) +public class WindowIntoTranslatorTest { + @Parameters(name = "{index}: {0}") + public static Iterable<WindowFn<?, ?>> data() { + // This pipeline exists for construction, not to run any test. + return ImmutableList.<WindowFn<?, ?>>builder() + .add(FixedWindows.of(Duration.standardMinutes(10L))) + .add(new GlobalWindows()) + .add(Sessions.withGapDuration(Duration.standardMinutes(15L))) + .add(SlidingWindows.of(Duration.standardMinutes(5L)).every(Duration.standardMinutes(1L))) + .add(new CustomWindows()) + .build(); + } + + @Parameter(0) + public WindowFn<?, ?> windowFn; + + @Rule + public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testToFromProto() throws InvalidProtocolBufferException { + pipeline.apply(GenerateSequence.from(0)).apply(Window.<Long>into((WindowFn) windowFn)); + + final AtomicReference<AppliedPTransform<?, ?, Assign<?>>> assign = new AtomicReference<>(null); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + if (node.getTransform() instanceof Window.Assign) { + checkState(assign.get() == null); + assign.set((AppliedPTransform<?, ?, Assign<?>>) node.toAppliedPTransform()); + } + } + }); + checkState(assign.get() != null); + + SdkComponents components = SdkComponents.create(); + WindowIntoPayload payload = + WindowIntoTranslator.toProto(assign.get().getTransform(), components); + + assertEquals(windowFn, WindowIntoTranslator.getWindowFn(payload)); + } + + private static class CustomWindows extends PartitioningWindowFn<String, BoundedWindow> { + @Override + public BoundedWindow assignWindow(Instant timestamp) { + return GlobalWindow.INSTANCE; + } + + @Override + public boolean isCompatible(WindowFn<?, ?> other) { + return getClass().equals(other.getClass()); + } + + @Override + public Coder<BoundedWindow> windowCoder() { + return (Coder) GlobalWindow.Coder.INSTANCE; + } + + @Override + public boolean equals(Object other) { + return other != null && other.getClass().equals(this.getClass()); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + } +}
