Remove unused (and untested) initial splittling logic.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a882e8f3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a882e8f3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a882e8f3 Branch: refs/heads/gearpump-runner Commit: a882e8f3a33c4a430f55d53b65285123c5a4f50d Parents: 5d6ad19 Author: Robert Bradshaw <[email protected]> Authored: Thu Jun 22 12:46:13 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Jun 22 17:05:32 2017 -0700 ---------------------------------------------------------------------- .../runners/portability/fn_api_runner.py | 1 - .../apache_beam/runners/worker/sdk_worker.py | 51 ------------- .../runners/worker/sdk_worker_test.py | 77 -------------------- 3 files changed, 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index b45ff76..a8e2eb4 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -19,7 +19,6 @@ """ import base64 import collections -import json import logging import Queue as queue import threading http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/worker/sdk_worker.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index d135984..6a366eb 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -28,9 +28,7 @@ import logging import Queue as queue import threading import traceback -import zlib -import dill from google.protobuf import wrappers_pb2 from apache_beam.coders import coder_impl @@ -165,37 +163,6 @@ class SideInputSource(native_iobase.NativeSource, yield self._coder.get_impl().decode_from_stream(input_stream, True) -def unpack_and_deserialize_py_fn(function_spec): - """Returns unpacked and deserialized object from function spec proto.""" - return pickler.loads(unpack_function_spec_data(function_spec)) - - -def unpack_function_spec_data(function_spec): - """Returns unpacked data from function spec proto.""" - data = wrappers_pb2.BytesValue() - function_spec.data.Unpack(data) - return data.value - - -# pylint: disable=redefined-builtin -def serialize_and_pack_py_fn(fn, urn, id=None): - """Returns serialized and packed function in a function spec proto.""" - return pack_function_spec_data(pickler.dumps(fn), urn, id) -# pylint: enable=redefined-builtin - - -# pylint: disable=redefined-builtin -def pack_function_spec_data(value, urn, id=None): - """Returns packed data in a function spec proto.""" - data = wrappers_pb2.BytesValue(value=value) - fn_proto = beam_fn_api_pb2.FunctionSpec(urn=urn) - fn_proto.data.Pack(data) - if id: - fn_proto.id = id - return fn_proto -# pylint: enable=redefined-builtin - - def memoize(func): cache = {} missing = object() @@ -286,24 +253,6 @@ class SdkWorker(object): self.fns[p_transform.function_spec.id] = p_transform.function_spec return beam_fn_api_pb2.RegisterResponse() - def initial_source_split(self, request, unused_instruction_id=None): - source_spec = self.fns[request.source_reference] - assert source_spec.urn == PYTHON_SOURCE_URN - source_bundle = unpack_and_deserialize_py_fn( - self.fns[request.source_reference]) - splits = source_bundle.source.split(request.desired_bundle_size_bytes, - source_bundle.start_position, - source_bundle.stop_position) - response = beam_fn_api_pb2.InitialSourceSplitResponse() - response.splits.extend([ - beam_fn_api_pb2.SourceSplit( - source=serialize_and_pack_py_fn(split, PYTHON_SOURCE_URN), - relative_size=split.weight, - ) - for split in splits - ]) - return response - def create_execution_tree(self, descriptor): # TODO(robertwb): Figure out the correct prefix to use for output counters # from StateSampler. http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/worker/sdk_worker_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py index c431bcd..553d5b8 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py @@ -27,10 +27,7 @@ import unittest from concurrent import futures import grpc -from apache_beam.io.concat_source_test import RangeSource -from apache_beam.io.iobase import SourceBundle from apache_beam.portability.api import beam_fn_api_pb2 -from apache_beam.runners.worker import data_plane from apache_beam.runners.worker import sdk_worker @@ -88,80 +85,6 @@ class SdkWorkerTest(unittest.TestCase): harness.worker.fns, {item.id: item for item in fns + process_bundle_descriptors}) - @unittest.skip("initial splitting not in proto") - def test_source_split(self): - source = RangeSource(0, 100) - expected_splits = list(source.split(30)) - - worker = sdk_harness.SdkWorker( - None, data_plane.GrpcClientDataChannelFactory()) - worker.register( - beam_fn_api_pb2.RegisterRequest( - process_bundle_descriptor=[beam_fn_api_pb2.ProcessBundleDescriptor( - primitive_transform=[beam_fn_api_pb2.PrimitiveTransform( - function_spec=sdk_harness.serialize_and_pack_py_fn( - SourceBundle(1.0, source, None, None), - sdk_harness.PYTHON_SOURCE_URN, - id="src"))])])) - split_response = worker.initial_source_split( - beam_fn_api_pb2.InitialSourceSplitRequest( - desired_bundle_size_bytes=30, - source_reference="src")) - - self.assertEqual( - expected_splits, - [sdk_harness.unpack_and_deserialize_py_fn(s.source) - for s in split_response.splits]) - - self.assertEqual( - [s.weight for s in expected_splits], - [s.relative_size for s in split_response.splits]) - - @unittest.skip("initial splitting not in proto") - def test_source_split_via_instruction(self): - - source = RangeSource(0, 100) - expected_splits = list(source.split(30)) - - test_controller = BeamFnControlServicer([ - beam_fn_api_pb2.InstructionRequest( - instruction_id="register_request", - register=beam_fn_api_pb2.RegisterRequest( - process_bundle_descriptor=[ - beam_fn_api_pb2.ProcessBundleDescriptor( - primitive_transform=[beam_fn_api_pb2.PrimitiveTransform( - function_spec=sdk_harness.serialize_and_pack_py_fn( - SourceBundle(1.0, source, None, None), - sdk_harness.PYTHON_SOURCE_URN, - id="src"))])])), - beam_fn_api_pb2.InstructionRequest( - instruction_id="split_request", - initial_source_split=beam_fn_api_pb2.InitialSourceSplitRequest( - desired_bundle_size_bytes=30, - source_reference="src")) - ]) - - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - beam_fn_api_pb2.add_BeamFnControlServicer_to_server(test_controller, server) - test_port = server.add_insecure_port("[::]:0") - server.start() - - channel = grpc.insecure_channel("localhost:%s" % test_port) - harness = sdk_harness.SdkHarness(channel) - harness.run() - - split_response = test_controller.responses[ - "split_request"].initial_source_split - - self.assertEqual( - expected_splits, - [sdk_harness.unpack_and_deserialize_py_fn(s.source) - for s in split_response.splits]) - - self.assertEqual( - [s.weight for s in expected_splits], - [s.relative_size for s in split_response.splits]) - if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO)
