Remove fn api bundle descriptor translation.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d6ad199 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d6ad199 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d6ad199 Branch: refs/heads/gearpump-runner Commit: 5d6ad19958d0a2394f9e33720a04cc954279a7e7 Parents: 7645c44 Author: Robert Bradshaw <[email protected]> Authored: Thu Jun 22 12:44:23 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Jun 22 17:05:32 2017 -0700 ---------------------------------------------------------------------- .../runners/portability/fn_api_runner.py | 191 +------------------ .../runners/portability/fn_api_runner_test.py | 18 +- .../apache_beam/runners/worker/sdk_worker.py | 150 --------------- 3 files changed, 4 insertions(+), 355 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index a27e293..b45ff76 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -115,13 +115,9 @@ OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps( class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): - def __init__(self, use_runner_protos=False): + def __init__(self): super(FnApiRunner, self).__init__() self._last_uid = -1 - if use_runner_protos: - self._map_task_to_protos = self._map_task_to_runner_protos - else: - self._map_task_to_protos = self._map_task_to_fn_protos def has_metrics_support(self): return False @@ -145,7 +141,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): process_bundle_descriptor=[process_bundle_descriptor]) ), runner_sinks, input_data - def _map_task_to_runner_protos(self, map_task, data_operation_spec): + def _map_task_to_protos(self, map_task, data_operation_spec): input_data = {} side_input_data = {} runner_sinks = {} @@ -265,189 +261,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): environments=dict(context_proto.environments.items())) return input_data, side_input_data, runner_sinks, process_bundle_descriptor - def _map_task_to_fn_protos(self, map_task, data_operation_spec): - - input_data = {} - side_input_data = {} - runner_sinks = {} - transforms = [] - transform_index_to_id = {} - - # Maps coders to new coder objects and references. - coders = {} - - def coder_id(coder): - if coder not in coders: - coders[coder] = beam_fn_api_pb2.Coder( - function_spec=sdk_worker.pack_function_spec_data( - json.dumps(coder.as_cloud_object()), - sdk_worker.PYTHON_CODER_URN, id=self._next_uid())) - - return coders[coder].function_spec.id - - def output_tags(op): - return getattr(op, 'output_tags', ['out']) - - def as_target(op_input): - input_op_index, input_output_index = op_input - input_op = map_task[input_op_index][1] - return { - 'ignored_input_tag': - beam_fn_api_pb2.Target.List(target=[ - beam_fn_api_pb2.Target( - primitive_transform_reference=transform_index_to_id[ - input_op_index], - name=output_tags(input_op)[input_output_index]) - ]) - } - - def outputs(op): - return { - tag: beam_fn_api_pb2.PCollection(coder_reference=coder_id(coder)) - for tag, coder in zip(output_tags(op), op.output_coders) - } - - for op_ix, (stage_name, operation) in enumerate(map_task): - transform_id = transform_index_to_id[op_ix] = self._next_uid() - if isinstance(operation, operation_specs.WorkerInMemoryWrite): - # Write this data back to the runner. - fn = beam_fn_api_pb2.FunctionSpec(urn=sdk_worker.DATA_OUTPUT_URN, - id=self._next_uid()) - if data_operation_spec: - fn.data.Pack(data_operation_spec) - inputs = as_target(operation.input) - side_inputs = {} - runner_sinks[(transform_id, 'out')] = operation - - elif isinstance(operation, operation_specs.WorkerRead): - # A Read is either translated to a direct injection of windowed values - # into the sdk worker, or an injection of the source object into the - # sdk worker as data followed by an SDF that reads that source. - if (isinstance(operation.source.source, - maptask_executor_runner.InMemorySource) - and isinstance(operation.source.source.default_output_coder(), - WindowedValueCoder)): - output_stream = create_OutputStream() - element_coder = ( - operation.source.source.default_output_coder().get_impl()) - # Re-encode the elements in the nested context and - # concatenate them together - for element in operation.source.source.read(None): - element_coder.encode_to_stream(element, output_stream, True) - target_name = self._next_uid() - input_data[(transform_id, target_name)] = output_stream.get() - fn = beam_fn_api_pb2.FunctionSpec(urn=sdk_worker.DATA_INPUT_URN, - id=self._next_uid()) - if data_operation_spec: - fn.data.Pack(data_operation_spec) - inputs = {target_name: beam_fn_api_pb2.Target.List()} - side_inputs = {} - else: - # Read the source object from the runner. - source_coder = beam.coders.DillCoder() - input_transform_id = self._next_uid() - output_stream = create_OutputStream() - source_coder.get_impl().encode_to_stream( - GlobalWindows.windowed_value(operation.source), - output_stream, - True) - target_name = self._next_uid() - input_data[(input_transform_id, target_name)] = output_stream.get() - input_ptransform = beam_fn_api_pb2.PrimitiveTransform( - id=input_transform_id, - function_spec=beam_fn_api_pb2.FunctionSpec( - urn=sdk_worker.DATA_INPUT_URN, - id=self._next_uid()), - # TODO(robertwb): Possible name collision. - step_name=stage_name + '/inject_source', - inputs={target_name: beam_fn_api_pb2.Target.List()}, - outputs={ - 'out': - beam_fn_api_pb2.PCollection( - coder_reference=coder_id(source_coder)) - }) - if data_operation_spec: - input_ptransform.function_spec.data.Pack(data_operation_spec) - transforms.append(input_ptransform) - - # Read the elements out of the source. - fn = sdk_worker.pack_function_spec_data( - OLDE_SOURCE_SPLITTABLE_DOFN_DATA, - sdk_worker.PYTHON_DOFN_URN, - id=self._next_uid()) - inputs = { - 'ignored_input_tag': - beam_fn_api_pb2.Target.List(target=[ - beam_fn_api_pb2.Target( - primitive_transform_reference=input_transform_id, - name='out') - ]) - } - side_inputs = {} - - elif isinstance(operation, operation_specs.WorkerDoFn): - fn = sdk_worker.pack_function_spec_data( - operation.serialized_fn, - sdk_worker.PYTHON_DOFN_URN, - id=self._next_uid()) - inputs = as_target(operation.input) - # Store the contents of each side input for state access. - for si in operation.side_inputs: - assert isinstance(si.source, iobase.BoundedSource) - element_coder = si.source.default_output_coder() - view_id = self._next_uid() - # TODO(robertwb): Actually flesh out the ViewFn API. - side_inputs[si.tag] = beam_fn_api_pb2.SideInput( - view_fn=sdk_worker.serialize_and_pack_py_fn( - element_coder, urn=sdk_worker.PYTHON_ITERABLE_VIEWFN_URN, - id=view_id)) - # Re-encode the elements in the nested context and - # concatenate them together - output_stream = create_OutputStream() - for element in si.source.read( - si.source.get_range_tracker(None, None)): - element_coder.get_impl().encode_to_stream( - element, output_stream, True) - elements_data = output_stream.get() - side_input_data[view_id] = elements_data - - elif isinstance(operation, operation_specs.WorkerFlatten): - fn = sdk_worker.pack_function_spec_data( - operation.serialized_fn, - sdk_worker.IDENTITY_DOFN_URN, - id=self._next_uid()) - inputs = { - 'ignored_input_tag': - beam_fn_api_pb2.Target.List(target=[ - beam_fn_api_pb2.Target( - primitive_transform_reference=transform_index_to_id[ - input_op_index], - name=output_tags(map_task[input_op_index][1])[ - input_output_index]) - for input_op_index, input_output_index in operation.inputs - ]) - } - side_inputs = {} - - else: - raise TypeError(operation) - - ptransform = beam_fn_api_pb2.PrimitiveTransform( - id=transform_id, - function_spec=fn, - step_name=stage_name, - inputs=inputs, - side_inputs=side_inputs, - outputs=outputs(operation)) - transforms.append(ptransform) - - process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor( - id=self._next_uid(), - coders=coders.values(), - primitive_transform=transforms) - - return input_data, side_input_data, runner_sinks, process_bundle_descriptor - def _run_map_task( self, map_task, control_handler, state_handler, data_plane_handler, data_operation_spec): http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index e2eae26..9159035 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -23,26 +23,12 @@ from apache_beam.runners.portability import fn_api_runner from apache_beam.runners.portability import maptask_executor_runner_test -class FnApiRunnerTestWithRunnerProtos( +class FnApiRunnerTest( maptask_executor_runner_test.MapTaskExecutorRunnerTest): def create_pipeline(self): return beam.Pipeline( - runner=fn_api_runner.FnApiRunner(use_runner_protos=True)) - - def test_combine_per_key(self): - # TODO(robertwb): Implement PGBKCV operation. - pass - - # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner - - -class FnApiRunnerTestWithFnProtos( - maptask_executor_runner_test.MapTaskExecutorRunnerTest): - - def create_pipeline(self): - return beam.Pipeline( - runner=fn_api_runner.FnApiRunner(use_runner_protos=False)) + runner=fn_api_runner.FnApiRunner()) def test_combine_per_key(self): # TODO(robertwb): Implement PGBKCV operation. http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/worker/sdk_worker.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index a2c9f42..d135984 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -196,25 +196,6 @@ def pack_function_spec_data(value, urn, id=None): # pylint: enable=redefined-builtin -# TODO(vikasrk): Consistently use same format everywhere. -def load_compressed(compressed_data): - """Returns a decompressed and deserialized python object.""" - # Note: SDK uses ``pickler.dumps`` to serialize certain python objects - # (like sources), which involves serialization, compression and base64 - # encoding. We cannot directly use ``pickler.loads`` for - # deserialization, as the runner would have already base64 decoded the - # data. So we only need to decompress and deserialize. - - data = zlib.decompress(compressed_data) - try: - return dill.loads(data) - except Exception: # pylint: disable=broad-except - dill.dill._trace(True) # pylint: disable=protected-access - return dill.loads(data) - finally: - dill.dill._trace(False) # pylint: disable=protected-access - - def memoize(func): cache = {} missing = object() @@ -324,12 +305,6 @@ class SdkWorker(object): return response def create_execution_tree(self, descriptor): - if descriptor.transforms: - return self.create_execution_tree_from_runner_api(descriptor) - else: - return self.create_execution_tree_from_fn_api(descriptor) - - def create_execution_tree_from_runner_api(self, descriptor): # TODO(robertwb): Figure out the correct prefix to use for output counters # from StateSampler. counter_factory = counters.CounterFactory() @@ -368,131 +343,6 @@ class SdkWorker(object): for transform_id in sorted( descriptor.transforms, key=topological_height, reverse=True)] - def create_execution_tree_from_fn_api(self, descriptor): - # TODO(vikasrk): Add an id field to Coder proto and use that instead. - coders = {coder.function_spec.id: operation_specs.get_coder_from_spec( - json.loads(unpack_function_spec_data(coder.function_spec))) - for coder in descriptor.coders} - - counter_factory = counters.CounterFactory() - # TODO(robertwb): Figure out the correct prefix to use for output counters - # from StateSampler. - state_sampler = statesampler.StateSampler( - 'fnapi-step%s-' % descriptor.id, counter_factory) - consumers = collections.defaultdict(lambda: collections.defaultdict(list)) - ops_by_id = {} - reversed_ops = [] - - for transform in reversed(descriptor.primitive_transform): - # TODO(robertwb): Figure out how to plumb through the operation name (e.g. - # "s3") from the service through the FnAPI so that msec counters can be - # reported and correctly plumbed through the service and the UI. - operation_name = 'fnapis%s' % transform.id - - def only_element(iterable): - element, = iterable - return element - - if transform.function_spec.urn == DATA_OUTPUT_URN: - target = beam_fn_api_pb2.Target( - primitive_transform_reference=transform.id, - name=only_element(transform.outputs.keys())) - - op = DataOutputOperation( - operation_name, - transform.step_name, - consumers[transform.id], - counter_factory, - state_sampler, - coders[only_element(transform.outputs.values()).coder_reference], - target, - self.data_channel_factory.create_data_channel( - transform.function_spec)) - - elif transform.function_spec.urn == DATA_INPUT_URN: - target = beam_fn_api_pb2.Target( - primitive_transform_reference=transform.id, - name=only_element(transform.inputs.keys())) - op = DataInputOperation( - operation_name, - transform.step_name, - consumers[transform.id], - counter_factory, - state_sampler, - coders[only_element(transform.outputs.values()).coder_reference], - target, - self.data_channel_factory.create_data_channel( - transform.function_spec)) - - elif transform.function_spec.urn == PYTHON_DOFN_URN: - def create_side_input(tag, si): - # TODO(robertwb): Extract windows (and keys) out of element data. - return operation_specs.WorkerSideInputSource( - tag=tag, - source=SideInputSource( - self.state_handler, - beam_fn_api_pb2.StateKey.MultimapSideInput( - key=si.view_fn.id.encode('utf-8')), - coder=unpack_and_deserialize_py_fn(si.view_fn))) - output_tags = list(transform.outputs.keys()) - spec = operation_specs.WorkerDoFn( - serialized_fn=unpack_function_spec_data(transform.function_spec), - output_tags=output_tags, - input=None, - side_inputs=[create_side_input(tag, si) - for tag, si in transform.side_inputs.items()], - output_coders=[coders[transform.outputs[out].coder_reference] - for out in output_tags]) - - op = operations.DoOperation(operation_name, spec, counter_factory, - state_sampler) - # TODO(robertwb): Move these to the constructor. - op.step_name = transform.step_name - for tag, op_consumers in consumers[transform.id].items(): - for consumer in op_consumers: - op.add_receiver( - consumer, output_tags.index(tag)) - - elif transform.function_spec.urn == IDENTITY_DOFN_URN: - op = operations.FlattenOperation(operation_name, None, counter_factory, - state_sampler) - # TODO(robertwb): Move these to the constructor. - op.step_name = transform.step_name - for tag, op_consumers in consumers[transform.id].items(): - for consumer in op_consumers: - op.add_receiver(consumer, 0) - - elif transform.function_spec.urn == PYTHON_SOURCE_URN: - source = load_compressed(unpack_function_spec_data( - transform.function_spec)) - # TODO(vikasrk): Remove this once custom source is implemented with - # splittable dofn via the data plane. - spec = operation_specs.WorkerRead( - iobase.SourceBundle(1.0, source, None, None), - [WindowedValueCoder(source.default_output_coder())]) - op = operations.ReadOperation(operation_name, spec, counter_factory, - state_sampler) - op.step_name = transform.step_name - output_tags = list(transform.outputs.keys()) - for tag, op_consumers in consumers[transform.id].items(): - for consumer in op_consumers: - op.add_receiver( - consumer, output_tags.index(tag)) - - else: - raise NotImplementedError - - # Record consumers. - for _, inputs in transform.inputs.items(): - for target in inputs.target: - consumers[target.primitive_transform_reference][target.name].append( - op) - - reversed_ops.append(op) - ops_by_id[transform.id] = op - - return list(reversed(reversed_ops)) - def process_bundle(self, request, instruction_id): ops = self.create_execution_tree( self.fns[request.process_bundle_descriptor_reference])
