Repository: beam Updated Branches: refs/heads/master 7a075cc34 -> 2f9428c3e
Unify Java and Python WindowingStrategy representations. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de757860 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de757860 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de757860 Branch: refs/heads/master Commit: de757860945d5966a51173c54d29d0a733e66686 Parents: 7a075cc Author: Robert Bradshaw <[email protected]> Authored: Wed May 24 17:23:31 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Fri Jun 2 09:16:42 2017 -0700 ---------------------------------------------------------------------- pom.xml | 6 ++ runners/core-construction-java/pom.xml | 5 ++ .../WindowingStrategyTranslation.java | 60 ++++++++++++++------ .../src/main/proto/beam_known_payloads.proto | 53 +++++++++++++++++ .../runners/dataflow/dataflow_runner.py | 39 ++++++++++++- .../runners/dataflow/dataflow_runner_test.py | 11 ++++ sdks/python/apache_beam/transforms/window.py | 57 ++++++++++++------- sdks/python/apache_beam/utils/proto_utils.py | 6 ++ sdks/python/apache_beam/utils/urns.py | 10 ++-- sdks/python/run_pylint.sh | 2 + 10 files changed, 206 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3e302e7..805a8d6 100644 --- a/pom.xml +++ b/pom.xml @@ -945,6 +945,12 @@ </dependency> <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java-util</artifactId> + <version>${protobuf.version}</version> + </dependency> + + <dependency> <groupId>com.google.api.grpc</groupId> <artifactId>grpc-google-common-protos</artifactId> <version>${grpc-google-common-protos.version}</version> http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/runners/core-construction-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml index 7eaa6f3..67951e9 100644 --- a/runners/core-construction-java/pom.xml +++ b/runners/core-construction-java/pom.xml @@ -70,6 +70,11 @@ </dependency> <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java-util</artifactId> + </dependency> + + <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index e92565f..a226624 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -17,12 +17,12 @@ */ package org.apache.beam.runners.core.construction; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.Durations; +import com.google.protobuf.util.Timestamps; import java.io.IOException; import java.io.Serializable; import org.apache.beam.sdk.common.runner.v1.RunnerApi; @@ -30,6 +30,11 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApiPayloads; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; @@ -153,9 +158,13 @@ public class WindowingStrategyTranslation implements Serializable { } } - // This URN says that the WindowFn is just a UDF blob the indicated SDK understands + public static final String GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1"; + public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"; + public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"; + public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"; + // This URN says that the WindowFn is just a UDF blob the Java SDK understands // TODO: standardize such things - public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1"; + public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1"; /** * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link @@ -168,7 +177,7 @@ public class WindowingStrategyTranslation implements Serializable { // TODO: Set environment ID .setSpec( FunctionSpec.newBuilder() - .setUrn(CUSTOM_WINDOWFN_URN) + .setUrn(SERIALIZED_JAVA_WINDOWFN_URN) .setParameter( Any.pack( BytesValue.newBuilder() @@ -261,18 +270,37 @@ public class WindowingStrategyTranslation implements Serializable { public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec) throws InvalidProtocolBufferException { - checkArgument( - windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN), - "Only Java-serialized %s instances are supported, with URN %s. But found URN %s", - WindowFn.class.getSimpleName(), - CUSTOM_WINDOWFN_URN, - windowFnSpec.getSpec().getUrn()); - - Object deserializedWindowFn = - SerializableUtils.deserializeFromByteArray( + switch (windowFnSpec.getSpec().getUrn()) { + case GLOBAL_WINDOWS_FN: + return new GlobalWindows(); + case FIXED_WINDOWS_FN: + RunnerApiPayloads.FixedWindowsPayload fixedParams = + windowFnSpec.getSpec().getParameter().unpack( + RunnerApiPayloads.FixedWindowsPayload.class); + return FixedWindows.of( + Duration.millis(Durations.toMillis(fixedParams.getSize()))) + .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset()))); + case SLIDING_WINDOWS_FN: + RunnerApiPayloads.SlidingWindowsPayload slidingParams = + windowFnSpec.getSpec().getParameter().unpack( + RunnerApiPayloads.SlidingWindowsPayload.class); + return SlidingWindows.of( + Duration.millis(Durations.toMillis(slidingParams.getSize()))) + .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod()))) + .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset()))); + case SESSION_WINDOWS_FN: + RunnerApiPayloads.SessionsPayload sessionParams = + windowFnSpec.getSpec().getParameter().unpack( + RunnerApiPayloads.SessionsPayload.class); + return Sessions.withGapDuration( + Duration.millis(Durations.toMillis(sessionParams.getGapSize()))); + case SERIALIZED_JAVA_WINDOWFN_URN: + return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray( windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(), "WindowFn"); - - return (WindowFn<?, ?>) deserializedWindowFn; + default: + throw new IllegalArgumentException( + "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn()); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto new file mode 100644 index 0000000..446bd59 --- /dev/null +++ b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Protocol Buffers describing the Runner API, which is the runner-independent, + * SDK-independent definition of the Beam model. + */ + +syntax = "proto3"; + +package org.apache.beam.runner_api.v1; + +option java_package = "org.apache.beam.sdk.common.runner.v1"; +option java_outer_classname = "RunnerApiPayloads"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; + +// beam:windowfn:global_windows:v0.1 +// empty payload + +// beam:windowfn:fixed_windows:v0.1 +message FixedWindowsPayload { + google.protobuf.Duration size = 1; + google.protobuf.Timestamp offset = 2; +} + +// beam:windowfn:sliding_windows:v0.1 +message SlidingWindowsPayload { + google.protobuf.Duration size = 1; + google.protobuf.Timestamp offset = 2; + google.protobuf.Duration period = 3; +} + +// beam:windowfn:session_windows:v0.1 +message SessionsPayload { + google.protobuf.Duration gap_size = 1; +} http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 046d3d5..3e0e268 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -25,6 +25,7 @@ import logging import threading import time import traceback +import urllib from apache_beam import error from apache_beam import coders @@ -416,7 +417,9 @@ class DataflowRunner(PipelineRunner): PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) windowing = transform_node.transform.get_windowing( transform_node.inputs) - step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing)) + step.add_property( + PropertyNames.SERIALIZED_FN, + self.serialize_windowing_strategy(windowing)) def run_ParDo(self, transform_node): transform = transform_node.transform @@ -697,6 +700,40 @@ class DataflowRunner(PipelineRunner): PropertyNames.STEP_NAME: input_step.proto.name, PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)}) + @classmethod + def serialize_windowing_strategy(cls, windowing): + from apache_beam.runners import pipeline_context + from apache_beam.runners.api import beam_runner_api_pb2 + context = pipeline_context.PipelineContext() + windowing_proto = windowing.to_runner_api(context) + return cls.byte_array_to_json_string( + beam_runner_api_pb2.MessageWithComponents( + components=context.to_runner_api(), + windowing_strategy=windowing_proto).SerializeToString()) + + @classmethod + def deserialize_windowing_strategy(cls, serialized_data): + # Imported here to avoid circular dependencies. + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam.runners import pipeline_context + from apache_beam.runners.api import beam_runner_api_pb2 + from apache_beam.transforms.core import Windowing + proto = beam_runner_api_pb2.MessageWithComponents() + proto.ParseFromString(cls.json_string_to_byte_array(serialized_data)) + return Windowing.from_runner_api( + proto.windowing_strategy, + pipeline_context.PipelineContext(proto.components)) + + @staticmethod + def byte_array_to_json_string(raw_bytes): + """Implements org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString.""" + return urllib.quote(raw_bytes) + + @staticmethod + def json_string_to_byte_array(encoded_string): + """Implements org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray.""" + return urllib.unquote(encoded_string) + class DataflowPipelineResult(PipelineResult): """Represents the state of a pipeline run on the Dataflow service.""" http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index ff4b51d..74fd01d 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -38,6 +38,8 @@ from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_a from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.core import _GroupByKeyOnly +from apache_beam.transforms.core import Windowing +from apache_beam.transforms import window from apache_beam.typehints import typehints # Protect against environments where apitools library is not available. @@ -240,6 +242,15 @@ class DataflowRunnerTest(unittest.TestCase): for _ in range(num_inputs): self.assertEqual(inputs[0].element_type, output_type) + def test_serialize_windowing_strategy(self): + # This just tests the basic path; more complete tests + # are in window_test.py. + strategy = Windowing(window.FixedWindows(10)) + self.assertEqual( + strategy, + DataflowRunner.deserialize_windowing_strategy( + DataflowRunner.serialize_windowing_strategy(strategy))) + if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 94187e0..f74c8a9 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -51,10 +51,12 @@ from __future__ import absolute_import import abc -from google.protobuf import struct_pb2 +from google.protobuf import duration_pb2 +from google.protobuf import timestamp_pb2 from apache_beam.coders import coders from apache_beam.runners.api import beam_runner_api_pb2 +from apache_beam.runners.api import beam_known_payloads_pb2 from apache_beam.transforms import timeutil from apache_beam.utils import proto_utils from apache_beam.utils import urns @@ -341,14 +343,18 @@ class FixedWindows(NonMergingWindowFn): def to_runner_api_parameter(self, context): return (urns.FIXED_WINDOWS_FN, - proto_utils.pack_Struct(size=self.size.micros, - offset=self.offset.micros)) - - @urns.RunnerApiFn.register_urn(urns.FIXED_WINDOWS_FN, struct_pb2.Struct) + beam_known_payloads_pb2.FixedWindowsPayload( + size=proto_utils.from_micros( + duration_pb2.Duration, self.size.micros), + offset=proto_utils.from_micros( + timestamp_pb2.Timestamp, self.offset.micros))) + + @urns.RunnerApiFn.register_urn( + urns.FIXED_WINDOWS_FN, beam_known_payloads_pb2.FixedWindowsPayload) def from_runner_api_parameter(fn_parameter, unused_context): return FixedWindows( - size=Duration(micros=fn_parameter['size']), - offset=Timestamp(micros=fn_parameter['offset'])) + size=Duration(micros=fn_parameter.size.ToMicroseconds()), + offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds())) class SlidingWindows(NonMergingWindowFn): @@ -392,17 +398,22 @@ class SlidingWindows(NonMergingWindowFn): def to_runner_api_parameter(self, context): return (urns.SLIDING_WINDOWS_FN, - proto_utils.pack_Struct( - size=self.size.micros, - offset=self.offset.micros, - period=self.period.micros)) - - @urns.RunnerApiFn.register_urn(urns.SLIDING_WINDOWS_FN, struct_pb2.Struct) + beam_known_payloads_pb2.SlidingWindowsPayload( + size=proto_utils.from_micros( + duration_pb2.Duration, self.size.micros), + offset=proto_utils.from_micros( + timestamp_pb2.Timestamp, self.offset.micros), + period=proto_utils.from_micros( + duration_pb2.Duration, self.period.micros))) + + @urns.RunnerApiFn.register_urn( + urns.SLIDING_WINDOWS_FN, + beam_known_payloads_pb2.SlidingWindowsPayload) def from_runner_api_parameter(fn_parameter, unused_context): return SlidingWindows( - size=Duration(micros=fn_parameter['size']), - offset=Timestamp(micros=fn_parameter['offset']), - period=Duration(micros=fn_parameter['period'])) + size=Duration(micros=fn_parameter.size.ToMicroseconds()), + offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds()), + period=Duration(micros=fn_parameter.period.ToMicroseconds())) class Sessions(WindowFn): @@ -452,10 +463,14 @@ class Sessions(WindowFn): if type(self) == type(other) == Sessions: return self.gap_size == other.gap_size - @urns.RunnerApiFn.register_urn(urns.SESSION_WINDOWS_FN, struct_pb2.Struct) - def from_runner_api_parameter(fn_parameter, unused_context): - return Sessions(gap_size=Duration(micros=fn_parameter['gap_size'])) - def to_runner_api_parameter(self, context): return (urns.SESSION_WINDOWS_FN, - proto_utils.pack_Struct(gap_size=self.gap_size.micros)) + beam_known_payloads_pb2.SessionsPayload( + gap_size=proto_utils.from_micros( + duration_pb2.Duration, self.gap_size.micros))) + + @urns.RunnerApiFn.register_urn( + urns.SESSION_WINDOWS_FN, beam_known_payloads_pb2.SessionsPayload) + def from_runner_api_parameter(fn_parameter, unused_context): + return Sessions( + gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds())) http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/utils/proto_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py index 090a821..af8f218 100644 --- a/sdks/python/apache_beam/utils/proto_utils.py +++ b/sdks/python/apache_beam/utils/proto_utils.py @@ -53,3 +53,9 @@ def pack_Struct(**kwargs): for key, value in kwargs.items(): msg[key] = value # pylint: disable=unsubscriptable-object, unsupported-assignment-operation return msg + + +def from_micros(cls, micros): + result = cls() + result.FromMicroseconds(micros) + return result http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/utils/urns.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index 379b5ff..849b8e3 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -26,11 +26,11 @@ from apache_beam.internal import pickler from apache_beam.utils import proto_utils -PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1" -GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1" -FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1" -SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1" -SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1" +PICKLED_WINDOW_FN = "beam:windowfn:pickled_python:v0.1" +GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1" +FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1" +SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1" +SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1" PICKLED_CODER = "beam:coder:pickled_python:v0.1" http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/run_pylint.sh ---------------------------------------------------------------------- diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index 400c577..4ef3e7f 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -50,6 +50,8 @@ EXCLUDED_GENERATED_FILES=( "apache_beam/runners/api/beam_fn_api_pb2_grpc.py" "apache_beam/runners/api/beam_runner_api_pb2.py" "apache_beam/runners/api/beam_runner_api_pb2_grpc.py" +"apache_beam/runners/api/beam_known_payloads_pb2.py" +"apache_beam/runners/api/beam_known_payloads_pb2_grpc.py" ) FILES_TO_IGNORE=""
