Java Dataflow runner harness compatibility.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8cab1533 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8cab1533 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8cab1533 Branch: refs/heads/gearpump-runner Commit: 8cab15338f811f880c6cfb820051cf355f92986b Parents: 3785b5b Author: Robert Bradshaw <[email protected]> Authored: Wed Jun 21 18:09:48 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Jun 22 12:34:27 2017 -0700 ---------------------------------------------------------------------- .../runners/portability/fn_api_runner.py | 6 ++++- .../apache_beam/runners/worker/sdk_worker.py | 26 +++++++++++++++----- 2 files changed, 25 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8cab1533/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index dabb7d6..a27e293 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -17,6 +17,7 @@ """A PipelineRunner using the SDK harness. """ +import base64 import collections import json import logging @@ -204,11 +205,14 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): else: # Otherwise serialize the source and execute it there. # TODO: Use SDFs with an initial impulse. + # The Dataflow runner harness strips the base64 encoding. do the same + # here until we get the same thing back that we sent in. transform_spec = beam_runner_api_pb2.FunctionSpec( urn=sdk_worker.PYTHON_SOURCE_URN, parameter=proto_utils.pack_Any( wrappers_pb2.BytesValue( - value=pickler.dumps(operation.source.source)))) + value=base64.b64decode( + pickler.dumps(operation.source.source))))) elif isinstance(operation, operation_specs.WorkerDoFn): # Record the contents of each side input for access via the state api. http://git-wip-us.apache.org/repos/asf/beam/blob/8cab1533/sdks/python/apache_beam/runners/worker/sdk_worker.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index fd7ecc4..a2c9f42 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -21,6 +21,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import base64 import collections import json import logging @@ -195,7 +196,7 @@ def pack_function_spec_data(value, urn, id=None): # pylint: enable=redefined-builtin -# TODO(vikasrk): move this method to ``coders.py`` in the SDK. +# TODO(vikasrk): Consistently use same format everywhere. def load_compressed(compressed_data): """Returns a decompressed and deserialized python object.""" # Note: SDK uses ``pickler.dumps`` to serialize certain python objects @@ -259,6 +260,10 @@ class SdkHarness(object): try: response = self.worker.do_instruction(work_request) except Exception: # pylint: disable=broad-except + logging.error( + 'Error processing instruction %s', + work_request.instruction_id, + exc_info=True) response = beam_fn_api_pb2.InstructionResponse( instruction_id=work_request.instruction_id, error=traceback.format_exc()) @@ -319,10 +324,10 @@ class SdkWorker(object): return response def create_execution_tree(self, descriptor): - if descriptor.primitive_transform: - return self.create_execution_tree_from_fn_api(descriptor) - else: + if descriptor.transforms: return self.create_execution_tree_from_runner_api(descriptor) + else: + return self.create_execution_tree_from_fn_api(descriptor) def create_execution_tree_from_runner_api(self, descriptor): # TODO(robertwb): Figure out the correct prefix to use for output counters @@ -551,7 +556,15 @@ class BeamTransformFactory(object): return creator(self, transform_id, transform_proto, parameter, consumers) def get_coder(self, coder_id): - return self.context.coders.get_by_id(coder_id) + coder_proto = self.descriptor.codersyyy[coder_id] + if coder_proto.spec.spec.urn: + return self.context.coders.get_by_id(coder_id) + else: + # No URN, assume cloud object encoding json bytes. + return operation_specs.get_coder_from_spec( + json.loads( + proto_utils.unpack_Any(coder_proto.spec.spec.parameter, + wrappers_pb2.BytesValue).value)) def get_output_coders(self, transform_proto): return { @@ -618,7 +631,8 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers): @BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue) def create(factory, transform_id, transform_proto, parameter, consumers): - source = pickler.loads(parameter.value) + # The Dataflow runner harness strips the base64 encoding. + source = pickler.loads(base64.b64encode(parameter.value)) spec = operation_specs.WorkerRead( iobase.SourceBundle(1.0, source, None, None), [WindowedValueCoder(source.default_output_coder())])
