Remove any_param field from FunctionSpec
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/060bda23 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/060bda23 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/060bda23 Branch: refs/heads/master Commit: 060bda23d1e5cd5146190aa34f2e212404cb6667 Parents: 294518e Author: Thomas Groh <[email protected]> Authored: Tue Sep 19 16:39:44 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Thu Oct 5 09:45:44 2017 -0700 ---------------------------------------------------------------------- .../WindowingStrategyTranslation.java | 7 ------ .../src/main/proto/beam_runner_api.proto | 3 --- sdks/python/apache_beam/coders/coders.py | 1 - .../runners/portability/fn_api_runner.py | 26 -------------------- sdks/python/apache_beam/transforms/core.py | 4 --- .../python/apache_beam/transforms/ptransform.py | 1 - sdks/python/apache_beam/utils/urns.py | 1 - 7 files changed, 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 1b4786c..be8601c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -17,9 +17,7 @@ */ package org.apache.beam.runners.core.construction; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.Durations; import com.google.protobuf.util.Timestamps; @@ -223,7 +221,6 @@ public class WindowingStrategyTranslation implements Serializable { .setSpec( FunctionSpec.newBuilder() .setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN) - .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build())) .setPayload(serializedFn) .build()) .build(); @@ -241,7 +238,6 @@ public class WindowingStrategyTranslation implements Serializable { .setSpec( FunctionSpec.newBuilder() .setUrn(FIXED_WINDOWS_FN) - .setAnyParam(Any.pack(fixedWindowsPayload)) .setPayload(fixedWindowsPayload.toByteString())) .build(); } else if (windowFn instanceof SlidingWindows) { @@ -254,7 +250,6 @@ public class WindowingStrategyTranslation implements Serializable { .setSpec( FunctionSpec.newBuilder() .setUrn(SLIDING_WINDOWS_FN) - .setAnyParam(Any.pack(slidingWindowsPayload)) .setPayload(slidingWindowsPayload.toByteString())) .build(); } else if (windowFn instanceof Sessions) { @@ -266,7 +261,6 @@ public class WindowingStrategyTranslation implements Serializable { .setSpec( FunctionSpec.newBuilder() .setUrn(SESSION_WINDOWS_FN) - .setAnyParam(Any.pack(sessionsPayload)) .setPayload(sessionsPayload.toByteString())) .build(); } else { @@ -274,7 +268,6 @@ public class WindowingStrategyTranslation implements Serializable { .setSpec( FunctionSpec.newBuilder() .setUrn(SERIALIZED_JAVA_WINDOWFN_URN) - .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build())) .setPayload(serializedFn)) .build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 9ba5577..74f3897 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -782,9 +782,6 @@ message FunctionSpec { // passed as-is. string urn = 1; - // (Deprecated) - google.protobuf.Any any_param = 2; - // (Optional) The data specifying any parameters to the URN. If // the URN does not require any arguments, this may be omitted. bytes payload = 3; http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 3021da5..cbea98f 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -234,7 +234,6 @@ class Coder(object): spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( urn=urn, - any_param=proto_utils.pack_Any(typed_param), payload=typed_param.SerializeToString() if typed_param is not None else None)), component_coder_ids=[context.coders.get_id(c) for c in components]) http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 21bf61a..20a4a61 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -27,7 +27,6 @@ import time from concurrent import futures import grpc -from google.protobuf import wrappers_pb2 import apache_beam as beam # pylint: disable=ungrouped-imports from apache_beam.coders import WindowedValueCoder @@ -349,8 +348,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): inputs=transform.inputs, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_OUTPUT_URN, - any_param=proto_utils.pack_Any( - wrappers_pb2.BytesValue(value=param)), payload=param))], downstream_side_inputs=frozenset(), must_follow=stage.must_follow) @@ -363,8 +360,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): outputs=transform.outputs, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_INPUT_URN, - any_param=proto_utils.pack_Any( - wrappers_pb2.BytesValue(value=param)), payload=param))], downstream_side_inputs=frozenset(), must_follow=union(frozenset([gbk_write]), stage.must_follow)) @@ -421,9 +416,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): inputs={local_in: transcoded_pcollection}, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_OUTPUT_URN, - any_param=proto_utils.pack_Any( - wrappers_pb2.BytesValue( - value=param)), payload=param))], downstream_side_inputs=frozenset(), must_follow=stage.must_follow) @@ -437,9 +429,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): outputs=transform.outputs, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_INPUT_URN, - any_param=proto_utils.pack_Any( - wrappers_pb2.BytesValue( - value=param)), payload=param))], downstream_side_inputs=frozenset(), must_follow=union(frozenset(flatten_writes), stage.must_follow)) @@ -549,9 +538,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): inputs={'in': pcoll}, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_OUTPUT_URN, - any_param=proto_utils.pack_Any( - wrappers_pb2.BytesValue( - value=pcoll_as_param)), payload=pcoll_as_param))]) fuse(producer, write_pcoll) if consumer.has_as_main_input(pcoll): @@ -562,9 +548,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): outputs={'out': pcoll}, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_INPUT_URN, - any_param=proto_utils.pack_Any( - wrappers_pb2.BytesValue( - value=pcoll_as_param)), payload=pcoll_as_param))], must_follow={write_pcoll}) fuse(read_pcoll, consumer) @@ -686,10 +669,8 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): raise NotImplementedError if data_operation_spec: transform.spec.payload = data_operation_spec.SerializeToString() - transform.spec.any_param.Pack(data_operation_spec) else: transform.spec.payload = "" - transform.spec.any_param.Clear() return data_input, data_side_input, data_output logging.info('Running %s', stage.name) @@ -838,7 +819,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): runner_sinks[(transform_id, target_name)] = operation transform_spec = beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_OUTPUT_URN, - any_param=proto_utils.pack_Any(data_operation_spec), payload=data_operation_spec.SerializeToString() \ if data_operation_spec is not None else None) @@ -854,7 +834,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): operation.source.source.default_output_coder()) transform_spec = beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_INPUT_URN, - any_param=proto_utils.pack_Any(data_operation_spec), payload=data_operation_spec.SerializeToString() \ if data_operation_spec is not None else None) @@ -867,9 +846,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): pickler.dumps(operation.source.source)) transform_spec = beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.PYTHON_SOURCE_URN, - any_param=proto_utils.pack_Any( - wrappers_pb2.BytesValue( - value=source_bytes)), payload=source_bytes) elif isinstance(operation, operation_specs.WorkerDoFn): @@ -889,8 +865,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): (operation.serialized_fn, side_input_extras)) transform_spec = beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.PYTHON_DOFN_URN, - any_param=proto_utils.pack_Any( - wrappers_pb2.BytesValue(value=augmented_serialized_fn)), payload=augmented_serialized_fn) elif isinstance(operation, operation_specs.WorkerFlatten): http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 5d92fe9..153dc32 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -53,7 +53,6 @@ from apache_beam.typehints.decorators import WithTypeHints from apache_beam.typehints.decorators import get_type_hints from apache_beam.typehints.trivial_inference import element_type from apache_beam.typehints.typehints import is_consistent_with -from apache_beam.utils import proto_utils from apache_beam.utils import urns __all__ = [ @@ -715,9 +714,6 @@ class ParDo(PTransformWithSideInputs): do_fn=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( urn=urns.PICKLED_DO_FN_INFO, - any_param=proto_utils.pack_Any( - wrappers_pb2.BytesValue( - value=picked_pardo_fn_data)), payload=picked_pardo_fn_data)))) @PTransform.register_urn( http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 7cf1441..2e6255a 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -452,7 +452,6 @@ class PTransform(WithTypeHints, HasDisplayData): urn, typed_param = self.to_runner_api_parameter(context) return beam_runner_api_pb2.FunctionSpec( urn=urn, - any_param=proto_utils.pack_Any(typed_param), payload=typed_param.SerializeToString() if typed_param is not None else None) http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/utils/urns.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index 7675d05..2aeaa53 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -128,7 +128,6 @@ class RunnerApiFn(object): return beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( urn=urn, - any_param=proto_utils.pack_Any(typed_param), payload=typed_param.SerializeToString() if typed_param is not None else None))
