Rename WindowIntoTranslator to WindowIntoTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc4f44f4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc4f44f4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc4f44f4 Branch: refs/heads/master Commit: bc4f44f46ca6afc0018834eb467b1112763c3323 Parents: 4460938 Author: Kenneth Knowles <[email protected]> Authored: Tue May 23 15:29:16 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 15:53:41 2017 -0700 ---------------------------------------------------------------------- .../construction/WindowIntoTranslation.java | 61 +++++++++ .../core/construction/WindowIntoTranslator.java | 61 --------- .../construction/WindowIntoTranslationTest.java | 127 +++++++++++++++++++ .../construction/WindowIntoTranslatorTest.java | 127 ------------------- 4 files changed, 188 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bc4f44f4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java new file mode 100644 index 0000000..69793b5 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; + +/** + * Utility methods for translating a {@link Window.Assign} to and from {@link RunnerApi} + * representations. + */ +public class WindowIntoTranslation { + + static class WindowAssignTranslator implements TransformPayloadTranslator<Window.Assign<?>> { + @Override + public FunctionSpec translate( + AppliedPTransform<?, ?, Window.Assign<?>> transform, SdkComponents components) { + return FunctionSpec.newBuilder() + .setUrn("urn:beam:transform:window:v1") + .setParameter( + Any.pack(WindowIntoTranslation.toProto(transform.getTransform(), components))) + .build(); + } + } + + public static WindowIntoPayload toProto(Window.Assign<?> transform, SdkComponents components) { + return WindowIntoPayload.newBuilder() + .setWindowFn(WindowingStrategies.toProto(transform.getWindowFn(), components)) + .build(); + } + + public static WindowFn<?, ?> getWindowFn(WindowIntoPayload payload) + throws InvalidProtocolBufferException { + SdkFunctionSpec spec = payload.getWindowFn(); + return WindowingStrategies.windowFnFromProto(spec); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bc4f44f4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java deleted file mode 100644 index 7ed2a49..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import com.google.protobuf.Any; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; - -/** - * Utility methods for translating a {@link Window.Assign} to and from {@link RunnerApi} - * representations. - */ -public class WindowIntoTranslator { - - static class WindowAssignTranslator implements TransformPayloadTranslator<Window.Assign<?>> { - @Override - public FunctionSpec translate( - AppliedPTransform<?, ?, Window.Assign<?>> transform, SdkComponents components) { - return FunctionSpec.newBuilder() - .setUrn("urn:beam:transform:window:v1") - .setParameter( - Any.pack(WindowIntoTranslator.toProto(transform.getTransform(), components))) - .build(); - } - } - - public static WindowIntoPayload toProto(Window.Assign<?> transform, SdkComponents components) { - return WindowIntoPayload.newBuilder() - .setWindowFn(WindowingStrategies.toProto(transform.getWindowFn(), components)) - .build(); - } - - public static WindowFn<?, ?> getWindowFn(WindowIntoPayload payload) - throws InvalidProtocolBufferException { - SdkFunctionSpec spec = payload.getWindowFn(); - return WindowingStrategies.windowFnFromProto(spec); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bc4f44f4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java new file mode 100644 index 0000000..cb9617a --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslationTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkState; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.Window.Assign; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests for {@link WindowIntoTranslation}. + */ +@RunWith(Parameterized.class) +public class WindowIntoTranslationTest { + @Parameters(name = "{index}: {0}") + public static Iterable<WindowFn<?, ?>> data() { + // This pipeline exists for construction, not to run any test. + return ImmutableList.<WindowFn<?, ?>>builder() + .add(FixedWindows.of(Duration.standardMinutes(10L))) + .add(new GlobalWindows()) + .add(Sessions.withGapDuration(Duration.standardMinutes(15L))) + .add(SlidingWindows.of(Duration.standardMinutes(5L)).every(Duration.standardMinutes(1L))) + .add(new CustomWindows()) + .build(); + } + + @Parameter(0) + public WindowFn<?, ?> windowFn; + + @Rule + public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testToFromProto() throws InvalidProtocolBufferException { + pipeline.apply(GenerateSequence.from(0)).apply(Window.<Long>into((WindowFn) windowFn)); + + final AtomicReference<AppliedPTransform<?, ?, Assign<?>>> assign = new AtomicReference<>(null); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + if (node.getTransform() instanceof Window.Assign) { + checkState(assign.get() == null); + assign.set( + (AppliedPTransform<?, ?, Assign<?>>) node.toAppliedPTransform(getPipeline())); + } + } + }); + checkState(assign.get() != null); + + SdkComponents components = SdkComponents.create(); + WindowIntoPayload payload = + WindowIntoTranslation.toProto(assign.get().getTransform(), components); + + assertEquals(windowFn, WindowIntoTranslation.getWindowFn(payload)); + } + + private static class CustomWindows extends PartitioningWindowFn<String, BoundedWindow> { + @Override + public BoundedWindow assignWindow(Instant timestamp) { + return GlobalWindow.INSTANCE; + } + + @Override + public boolean isCompatible(WindowFn<?, ?> other) { + return getClass().equals(other.getClass()); + } + + @Override + public Coder<BoundedWindow> windowCoder() { + return (Coder) GlobalWindow.Coder.INSTANCE; + } + + @Override + public boolean equals(Object other) { + return other != null && other.getClass().equals(this.getClass()); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bc4f44f4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslatorTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslatorTest.java deleted file mode 100644 index eaefe2e..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowIntoTranslatorTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static com.google.common.base.Preconditions.checkState; -import static org.junit.Assert.assertEquals; - -import com.google.common.collect.ImmutableList; -import com.google.protobuf.InvalidProtocolBufferException; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload; -import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.runners.TransformHierarchy.Node; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.Window.Assign; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -/** - * Tests for {@link WindowIntoTranslator}. - */ -@RunWith(Parameterized.class) -public class WindowIntoTranslatorTest { - @Parameters(name = "{index}: {0}") - public static Iterable<WindowFn<?, ?>> data() { - // This pipeline exists for construction, not to run any test. - return ImmutableList.<WindowFn<?, ?>>builder() - .add(FixedWindows.of(Duration.standardMinutes(10L))) - .add(new GlobalWindows()) - .add(Sessions.withGapDuration(Duration.standardMinutes(15L))) - .add(SlidingWindows.of(Duration.standardMinutes(5L)).every(Duration.standardMinutes(1L))) - .add(new CustomWindows()) - .build(); - } - - @Parameter(0) - public WindowFn<?, ?> windowFn; - - @Rule - public TestPipeline pipeline = TestPipeline.create(); - - @Test - public void testToFromProto() throws InvalidProtocolBufferException { - pipeline.apply(GenerateSequence.from(0)).apply(Window.<Long>into((WindowFn) windowFn)); - - final AtomicReference<AppliedPTransform<?, ?, Assign<?>>> assign = new AtomicReference<>(null); - pipeline.traverseTopologically( - new PipelineVisitor.Defaults() { - @Override - public void visitPrimitiveTransform(Node node) { - if (node.getTransform() instanceof Window.Assign) { - checkState(assign.get() == null); - assign.set( - (AppliedPTransform<?, ?, Assign<?>>) node.toAppliedPTransform(getPipeline())); - } - } - }); - checkState(assign.get() != null); - - SdkComponents components = SdkComponents.create(); - WindowIntoPayload payload = - WindowIntoTranslator.toProto(assign.get().getTransform(), components); - - assertEquals(windowFn, WindowIntoTranslator.getWindowFn(payload)); - } - - private static class CustomWindows extends PartitioningWindowFn<String, BoundedWindow> { - @Override - public BoundedWindow assignWindow(Instant timestamp) { - return GlobalWindow.INSTANCE; - } - - @Override - public boolean isCompatible(WindowFn<?, ?> other) { - return getClass().equals(other.getClass()); - } - - @Override - public Coder<BoundedWindow> windowCoder() { - return (Coder) GlobalWindow.Coder.INSTANCE; - } - - @Override - public boolean equals(Object other) { - return other != null && other.getClass().equals(this.getClass()); - } - - @Override - public int hashCode() { - return getClass().hashCode(); - } - } -}
