Rename WindowingStrategies to WindowingStrategyTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8b2119a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8b2119a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8b2119a Branch: refs/heads/master Commit: c8b2119ab9a75c7f781ce73ea9352734640a6f46 Parents: 7e37b70 Author: Kenneth Knowles <[email protected]> Authored: Tue May 23 15:32:47 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 15:53:41 2017 -0700 ---------------------------------------------------------------------- .../construction/PCollectionTranslation.java | 2 +- .../core/construction/ParDoTranslation.java | 2 +- .../core/construction/SdkComponents.java | 2 +- .../construction/WindowIntoTranslation.java | 4 +- .../core/construction/WindowingStrategies.java | 278 ------------------- .../WindowingStrategyTranslation.java | 278 +++++++++++++++++++ .../construction/WindowingStrategiesTest.java | 110 -------- .../WindowingStrategyTranslationTest.java | 111 ++++++++ .../dataflow/DataflowPipelineTranslator.java | 4 +- 9 files changed, 396 insertions(+), 395 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java index 46f714e..303c02d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java @@ -60,7 +60,7 @@ public class PCollectionTranslation { public static WindowingStrategy<?, ?> getWindowingStrategy( RunnerApi.PCollection pCollection, RunnerApi.Components components) throws InvalidProtocolBufferException { - return WindowingStrategies.fromProto( + return WindowingStrategyTranslation.fromProto( components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), components); } http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index bc5bb0e..28d577f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -265,7 +265,7 @@ public class ParDoTranslation { RunnerApi.PCollection inputCollection = components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id)); WindowingStrategy<?, ?> windowingStrategy = - WindowingStrategies.fromProto( + WindowingStrategyTranslation.fromProto( components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()), components); Coder<?> elemCoder = http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index 5c81875..b0f164f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -200,7 +200,7 @@ class SdkComponents { String name = uniqify(baseName, windowingStrategyIds.values()); windowingStrategyIds.put(windowingStrategy, name); RunnerApi.WindowingStrategy windowingStrategyProto = - WindowingStrategies.toProto(windowingStrategy, this); + WindowingStrategyTranslation.toProto(windowingStrategy, this); componentsBuilder.putWindowingStrategies(name, windowingStrategyProto); return name; } http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java index 69793b5..215beba 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java @@ -49,13 +49,13 @@ public class WindowIntoTranslation { public static WindowIntoPayload toProto(Window.Assign<?> transform, SdkComponents components) { return WindowIntoPayload.newBuilder() - .setWindowFn(WindowingStrategies.toProto(transform.getWindowFn(), components)) + .setWindowFn(WindowingStrategyTranslation.toProto(transform.getWindowFn(), components)) .build(); } public static WindowFn<?, ?> getWindowFn(WindowIntoPayload payload) throws InvalidProtocolBufferException { SdkFunctionSpec spec = payload.getWindowFn(); - return WindowingStrategies.windowFnFromProto(spec); + return WindowingStrategyTranslation.windowFnFromProto(spec); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java deleted file mode 100644 index 8dceebb..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core.construction; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; -import com.google.protobuf.InvalidProtocolBufferException; -import java.io.IOException; -import java.io.Serializable; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; -import org.joda.time.Duration; - -/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */ -public class WindowingStrategies implements Serializable { - - public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) { - switch (proto) { - case DISCARDING: - return AccumulationMode.DISCARDING_FIRED_PANES; - case ACCUMULATING: - return AccumulationMode.ACCUMULATING_FIRED_PANES; - case UNRECOGNIZED: - default: - // Whether or not it is proto that cannot recognize it (due to the version of the - // generated code we link to) or the switch hasn't been updated to handle it, - // the situation is the same: we don't know what this OutputTime means - throw new IllegalArgumentException( - String.format( - "Cannot convert unknown %s to %s: %s", - RunnerApi.AccumulationMode.class.getCanonicalName(), - AccumulationMode.class.getCanonicalName(), - proto)); - } - } - - public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) { - switch (accumulationMode) { - case DISCARDING_FIRED_PANES: - return RunnerApi.AccumulationMode.DISCARDING; - case ACCUMULATING_FIRED_PANES: - return RunnerApi.AccumulationMode.ACCUMULATING; - default: - throw new IllegalArgumentException( - String.format( - "Cannot convert unknown %s to %s: %s", - AccumulationMode.class.getCanonicalName(), - RunnerApi.AccumulationMode.class.getCanonicalName(), - accumulationMode)); - } - } - - public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) { - switch (closingBehavior) { - case FIRE_ALWAYS: - return RunnerApi.ClosingBehavior.EMIT_ALWAYS; - case FIRE_IF_NON_EMPTY: - return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY; - default: - throw new IllegalArgumentException( - String.format( - "Cannot convert unknown %s to %s: %s", - ClosingBehavior.class.getCanonicalName(), - RunnerApi.ClosingBehavior.class.getCanonicalName(), - closingBehavior)); - } - } - - public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) { - switch (proto) { - case EMIT_ALWAYS: - return ClosingBehavior.FIRE_ALWAYS; - case EMIT_IF_NONEMPTY: - return ClosingBehavior.FIRE_IF_NON_EMPTY; - case UNRECOGNIZED: - default: - // Whether or not it is proto that cannot recognize it (due to the version of the - // generated code we link to) or the switch hasn't been updated to handle it, - // the situation is the same: we don't know what this OutputTime means - throw new IllegalArgumentException( - String.format( - "Cannot convert unknown %s to %s: %s", - RunnerApi.ClosingBehavior.class.getCanonicalName(), - ClosingBehavior.class.getCanonicalName(), - proto)); - } - } - - public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) { - switch(timestampCombiner) { - case EARLIEST: - return OutputTime.EARLIEST_IN_PANE; - case END_OF_WINDOW: - return OutputTime.END_OF_WINDOW; - case LATEST: - return OutputTime.LATEST_IN_PANE; - default: - throw new IllegalArgumentException( - String.format( - "Unknown %s: %s", - TimestampCombiner.class.getSimpleName(), - timestampCombiner)); - } - } - - public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) { - switch (proto) { - case EARLIEST_IN_PANE: - return TimestampCombiner.EARLIEST; - case END_OF_WINDOW: - return TimestampCombiner.END_OF_WINDOW; - case LATEST_IN_PANE: - return TimestampCombiner.LATEST; - case UNRECOGNIZED: - default: - // Whether or not it is proto that cannot recognize it (due to the version of the - // generated code we link to) or the switch hasn't been updated to handle it, - // the situation is the same: we don't know what this OutputTime means - throw new IllegalArgumentException( - String.format( - "Cannot convert unknown %s to %s: %s", - RunnerApi.OutputTime.class.getCanonicalName(), - OutputTime.class.getCanonicalName(), - proto)); - } - } - - // This URN says that the WindowFn is just a UDF blob the indicated SDK understands - // TODO: standardize such things - public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1"; - - /** - * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link - * RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the - * input {@link WindowFn}. - */ - public static SdkFunctionSpec toProto( - WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) { - return SdkFunctionSpec.newBuilder() - // TODO: Set environment ID - .setSpec( - FunctionSpec.newBuilder() - .setUrn(CUSTOM_WINDOWFN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(windowFn))) - .build()))) - .build(); - } - - /** - * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where - * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link - * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link - * WindowingStrategy}. - */ - public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy) - throws IOException { - SdkComponents components = SdkComponents.create(); - RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components); - - return RunnerApi.MessageWithComponents.newBuilder() - .setWindowingStrategy(windowingStrategyProto) - .setComponents(components.toComponents()) - .build(); - } - - /** - * Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering - * any components in the provided {@link SdkComponents}. - */ - public static RunnerApi.WindowingStrategy toProto( - WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException { - SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components); - - RunnerApi.WindowingStrategy.Builder windowingStrategyProto = - RunnerApi.WindowingStrategy.newBuilder() - .setOutputTime(toProto(windowingStrategy.getTimestampCombiner())) - .setAccumulationMode(toProto(windowingStrategy.getMode())) - .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior())) - .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) - .setTrigger(Triggers.toProto(windowingStrategy.getTrigger())) - .setWindowFn(windowFnSpec) - .setWindowCoderId( - components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); - - return windowingStrategyProto.build(); - } - - /** - * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components} - * to the SDK's {@link WindowingStrategy}. - */ - public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto) - throws InvalidProtocolBufferException { - switch (proto.getRootCase()) { - case WINDOWING_STRATEGY: - return fromProto(proto.getWindowingStrategy(), proto.getComponents()); - default: - throw new IllegalArgumentException( - String.format( - "Expected a %s with components but received %s", - RunnerApi.WindowingStrategy.class.getCanonicalName(), proto)); - } - } - - /** - * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using - * the provided components to dereferences identifiers found in the proto. - */ - public static WindowingStrategy<?, ?> fromProto( - RunnerApi.WindowingStrategy proto, Components components) - throws InvalidProtocolBufferException { - - SdkFunctionSpec windowFnSpec = proto.getWindowFn(); - WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec); - TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime()); - AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode()); - Trigger trigger = Triggers.fromProto(proto.getTrigger()); - ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior()); - Duration allowedLateness = Duration.millis(proto.getAllowedLateness()); - - return WindowingStrategy.of(windowFn) - .withAllowedLateness(allowedLateness) - .withMode(accumulationMode) - .withTrigger(trigger) - .withTimestampCombiner(timestampCombiner) - .withClosingBehavior(closingBehavior); - } - - public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) - throws InvalidProtocolBufferException { - checkArgument( - windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN), - "Only Java-serialized %s instances are supported, with URN %s. But found URN %s", - WindowFn.class.getSimpleName(), - CUSTOM_WINDOWFN_URN, - windowFnSpec.getSpec().getUrn()); - - Object deserializedWindowFn = - SerializableUtils.deserializeFromByteArray( - windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(), - "WindowFn"); - - return (WindowFn<?, ?>) deserializedWindowFn; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java new file mode 100644 index 0000000..061f309 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.io.Serializable; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; +import org.joda.time.Duration; + +/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */ +public class WindowingStrategyTranslation implements Serializable { + + public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) { + switch (proto) { + case DISCARDING: + return AccumulationMode.DISCARDING_FIRED_PANES; + case ACCUMULATING: + return AccumulationMode.ACCUMULATING_FIRED_PANES; + case UNRECOGNIZED: + default: + // Whether or not it is proto that cannot recognize it (due to the version of the + // generated code we link to) or the switch hasn't been updated to handle it, + // the situation is the same: we don't know what this OutputTime means + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + RunnerApi.AccumulationMode.class.getCanonicalName(), + AccumulationMode.class.getCanonicalName(), + proto)); + } + } + + public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) { + switch (accumulationMode) { + case DISCARDING_FIRED_PANES: + return RunnerApi.AccumulationMode.DISCARDING; + case ACCUMULATING_FIRED_PANES: + return RunnerApi.AccumulationMode.ACCUMULATING; + default: + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + AccumulationMode.class.getCanonicalName(), + RunnerApi.AccumulationMode.class.getCanonicalName(), + accumulationMode)); + } + } + + public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) { + switch (closingBehavior) { + case FIRE_ALWAYS: + return RunnerApi.ClosingBehavior.EMIT_ALWAYS; + case FIRE_IF_NON_EMPTY: + return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY; + default: + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + ClosingBehavior.class.getCanonicalName(), + RunnerApi.ClosingBehavior.class.getCanonicalName(), + closingBehavior)); + } + } + + public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) { + switch (proto) { + case EMIT_ALWAYS: + return ClosingBehavior.FIRE_ALWAYS; + case EMIT_IF_NONEMPTY: + return ClosingBehavior.FIRE_IF_NON_EMPTY; + case UNRECOGNIZED: + default: + // Whether or not it is proto that cannot recognize it (due to the version of the + // generated code we link to) or the switch hasn't been updated to handle it, + // the situation is the same: we don't know what this OutputTime means + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + RunnerApi.ClosingBehavior.class.getCanonicalName(), + ClosingBehavior.class.getCanonicalName(), + proto)); + } + } + + public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) { + switch(timestampCombiner) { + case EARLIEST: + return OutputTime.EARLIEST_IN_PANE; + case END_OF_WINDOW: + return OutputTime.END_OF_WINDOW; + case LATEST: + return OutputTime.LATEST_IN_PANE; + default: + throw new IllegalArgumentException( + String.format( + "Unknown %s: %s", + TimestampCombiner.class.getSimpleName(), + timestampCombiner)); + } + } + + public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) { + switch (proto) { + case EARLIEST_IN_PANE: + return TimestampCombiner.EARLIEST; + case END_OF_WINDOW: + return TimestampCombiner.END_OF_WINDOW; + case LATEST_IN_PANE: + return TimestampCombiner.LATEST; + case UNRECOGNIZED: + default: + // Whether or not it is proto that cannot recognize it (due to the version of the + // generated code we link to) or the switch hasn't been updated to handle it, + // the situation is the same: we don't know what this OutputTime means + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + RunnerApi.OutputTime.class.getCanonicalName(), + OutputTime.class.getCanonicalName(), + proto)); + } + } + + // This URN says that the WindowFn is just a UDF blob the indicated SDK understands + // TODO: standardize such things + public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1"; + + /** + * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link + * RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the + * input {@link WindowFn}. + */ + public static SdkFunctionSpec toProto( + WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) { + return SdkFunctionSpec.newBuilder() + // TODO: Set environment ID + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_WINDOWFN_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(windowFn))) + .build()))) + .build(); + } + + /** + * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where + * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link + * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link + * WindowingStrategy}. + */ + public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy) + throws IOException { + SdkComponents components = SdkComponents.create(); + RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components); + + return RunnerApi.MessageWithComponents.newBuilder() + .setWindowingStrategy(windowingStrategyProto) + .setComponents(components.toComponents()) + .build(); + } + + /** + * Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering + * any components in the provided {@link SdkComponents}. + */ + public static RunnerApi.WindowingStrategy toProto( + WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException { + SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components); + + RunnerApi.WindowingStrategy.Builder windowingStrategyProto = + RunnerApi.WindowingStrategy.newBuilder() + .setOutputTime(toProto(windowingStrategy.getTimestampCombiner())) + .setAccumulationMode(toProto(windowingStrategy.getMode())) + .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior())) + .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) + .setTrigger(Triggers.toProto(windowingStrategy.getTrigger())) + .setWindowFn(windowFnSpec) + .setWindowCoderId( + components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); + + return windowingStrategyProto.build(); + } + + /** + * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components} + * to the SDK's {@link WindowingStrategy}. + */ + public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto) + throws InvalidProtocolBufferException { + switch (proto.getRootCase()) { + case WINDOWING_STRATEGY: + return fromProto(proto.getWindowingStrategy(), proto.getComponents()); + default: + throw new IllegalArgumentException( + String.format( + "Expected a %s with components but received %s", + RunnerApi.WindowingStrategy.class.getCanonicalName(), proto)); + } + } + + /** + * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using + * the provided components to dereferences identifiers found in the proto. + */ + public static WindowingStrategy<?, ?> fromProto( + RunnerApi.WindowingStrategy proto, Components components) + throws InvalidProtocolBufferException { + + SdkFunctionSpec windowFnSpec = proto.getWindowFn(); + WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec); + TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime()); + AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode()); + Trigger trigger = Triggers.fromProto(proto.getTrigger()); + ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior()); + Duration allowedLateness = Duration.millis(proto.getAllowedLateness()); + + return WindowingStrategy.of(windowFn) + .withAllowedLateness(allowedLateness) + .withMode(accumulationMode) + .withTrigger(trigger) + .withTimestampCombiner(timestampCombiner) + .withClosingBehavior(closingBehavior); + } + + public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) + throws InvalidProtocolBufferException { + checkArgument( + windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN), + "Only Java-serialized %s instances are supported, with URN %s. But found URN %s", + WindowFn.class.getSimpleName(), + CUSTOM_WINDOWFN_URN, + windowFnSpec.getSpec().getUrn()); + + Object deserializedWindowFn = + SerializableUtils.deserializeFromByteArray( + windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(), + "WindowFn"); + + return (WindowFn<?, ?>) deserializedWindowFn; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java deleted file mode 100644 index 7296a77..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core.construction; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -/** Unit tests for {@link WindowingStrategy}. */ -@RunWith(Parameterized.class) -public class WindowingStrategiesTest { - - // Each spec activates tests of all subsets of its fields - @AutoValue - abstract static class ToProtoAndBackSpec { - abstract WindowingStrategy getWindowingStrategy(); - } - - private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) { - return new AutoValue_WindowingStrategiesTest_ToProtoAndBackSpec(windowingStrategy); - } - - private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN = - FixedWindows.of(Duration.millis(12)); - - private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow(); - - @Parameters(name = "{index}: {0}") - public static Iterable<ToProtoAndBackSpec> data() { - return ImmutableList.of( - toProtoAndBackSpec(WindowingStrategy.globalDefault()), - toProtoAndBackSpec( - WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN) - .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS) - .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) - .withTrigger(REPRESENTATIVE_TRIGGER) - .withAllowedLateness(Duration.millis(71)) - .withTimestampCombiner(TimestampCombiner.EARLIEST)), - toProtoAndBackSpec( - WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN) - .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY) - .withMode(AccumulationMode.DISCARDING_FIRED_PANES) - .withTrigger(REPRESENTATIVE_TRIGGER) - .withAllowedLateness(Duration.millis(93)) - .withTimestampCombiner(TimestampCombiner.LATEST))); - } - - @Parameter(0) - public ToProtoAndBackSpec toProtoAndBackSpec; - - @Test - public void testToProtoAndBack() throws Exception { - WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy(); - WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy = - WindowingStrategies.fromProto(WindowingStrategies.toProto(windowingStrategy)); - - assertThat( - toProtoAndBackWindowingStrategy, - equalTo((WindowingStrategy) windowingStrategy.fixDefaults())); - } - - @Test - public void testToProtoAndBackWithComponents() throws Exception { - WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy(); - SdkComponents components = SdkComponents.create(); - RunnerApi.WindowingStrategy proto = - WindowingStrategies.toProto(windowingStrategy, components); - RunnerApi.Components protoComponents = components.toComponents(); - - assertThat( - WindowingStrategies.fromProto(proto, protoComponents).fixDefaults(), - Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults())); - - protoComponents.getCodersOrThrow( - components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java new file mode 100644 index 0000000..1e52803 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Unit tests for {@link WindowingStrategy}. */ +@RunWith(Parameterized.class) +public class WindowingStrategyTranslationTest { + + // Each spec activates tests of all subsets of its fields + @AutoValue + abstract static class ToProtoAndBackSpec { + abstract WindowingStrategy getWindowingStrategy(); + } + + private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) { + return new AutoValue_WindowingStrategyTranslationTest_ToProtoAndBackSpec(windowingStrategy); + } + + private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN = + FixedWindows.of(Duration.millis(12)); + + private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow(); + + @Parameters(name = "{index}: {0}") + public static Iterable<ToProtoAndBackSpec> data() { + return ImmutableList.of( + toProtoAndBackSpec(WindowingStrategy.globalDefault()), + toProtoAndBackSpec( + WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN) + .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withTrigger(REPRESENTATIVE_TRIGGER) + .withAllowedLateness(Duration.millis(71)) + .withTimestampCombiner(TimestampCombiner.EARLIEST)), + toProtoAndBackSpec( + WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN) + .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY) + .withMode(AccumulationMode.DISCARDING_FIRED_PANES) + .withTrigger(REPRESENTATIVE_TRIGGER) + .withAllowedLateness(Duration.millis(93)) + .withTimestampCombiner(TimestampCombiner.LATEST))); + } + + @Parameter(0) + public ToProtoAndBackSpec toProtoAndBackSpec; + + @Test + public void testToProtoAndBack() throws Exception { + WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy(); + WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy = + WindowingStrategyTranslation.fromProto( + WindowingStrategyTranslation.toProto(windowingStrategy)); + + assertThat( + toProtoAndBackWindowingStrategy, + equalTo((WindowingStrategy) windowingStrategy.fixDefaults())); + } + + @Test + public void testToProtoAndBackWithComponents() throws Exception { + WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy(); + SdkComponents components = SdkComponents.create(); + RunnerApi.WindowingStrategy proto = + WindowingStrategyTranslation.toProto(windowingStrategy, components); + RunnerApi.Components protoComponents = components.toComponents(); + + assertThat( + WindowingStrategyTranslation.fromProto(proto, protoComponents).fixDefaults(), + Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults())); + + protoComponents.getCodersOrThrow( + components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 6d7a0f8..af93ef5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -56,7 +56,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; -import org.apache.beam.runners.core.construction.WindowingStrategies; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues; import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle; @@ -124,7 +124,7 @@ public class DataflowPipelineTranslator { private static byte[] serializeWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) { try { - return WindowingStrategies.toProto(windowingStrategy).toByteArray(); + return WindowingStrategyTranslation.toProto(windowingStrategy).toByteArray(); } catch (Exception e) { throw new RuntimeException( String.format("Unable to format windowing strategy %s as bytes", windowingStrategy), e);
