Repository: beam Updated Branches: refs/heads/master f51fdd960 -> e4ef23e16
Port fn_api_runner to be able to use runner protos. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/08ec0d4d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/08ec0d4d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/08ec0d4d Branch: refs/heads/master Commit: 08ec0d4dbff330ecd48c806cd764ab5a96835bd9 Parents: f51fdd9 Author: Robert Bradshaw <rober...@gmail.com> Authored: Tue Jun 20 11:01:03 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Jun 20 13:47:30 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/pipeline_context.py | 17 +- .../runners/portability/fn_api_runner.py | 166 ++++++++++++- .../runners/portability/fn_api_runner_test.py | 20 +- .../apache_beam/runners/worker/sdk_worker.py | 243 ++++++++++++++++++- 4 files changed, 420 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/pipeline_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index e212abf..c2ae3f3 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -24,6 +24,7 @@ For internal use only; no backwards-compatibility guarantees. from apache_beam import pipeline from apache_beam import pvalue from apache_beam import coders +from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.transforms import core @@ -42,9 +43,10 @@ class _PipelineContextMap(object): self._id_to_proto = proto_map if proto_map else {} self._counter = 0 - def _unique_ref(self): + def _unique_ref(self, obj=None): self._counter += 1 - return "ref_%s_%s" % (self._obj_type.__name__, self._counter) + return "ref_%s_%s_%s" % ( + self._obj_type.__name__, type(obj).__name__, self._counter) def populate_map(self, proto_map): for id, proto in self._id_to_proto.items(): @@ -52,7 +54,7 @@ class _PipelineContextMap(object): def get_id(self, obj): if obj not in self._obj_to_id: - id = self._unique_ref() + id = self._unique_ref(obj) self._id_to_obj[id] = obj self._obj_to_id[obj] = id self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) @@ -79,11 +81,16 @@ class PipelineContext(object): # TODO: environment } - def __init__(self, context_proto=None): + def __init__(self, proto=None): + if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor): + proto = beam_runner_api_pb2.Components( + coders=dict(proto.codersyyy.items()), + windowing_strategies=dict(proto.windowing_strategies.items()), + environments=dict(proto.environments.items())) for name, cls in self._COMPONENT_TYPES.items(): setattr( self, name, _PipelineContextMap( - self, cls, getattr(context_proto, name, None))) + self, cls, getattr(proto, name, None))) @staticmethod def from_runner_api(proto): http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index d792131..dabb7d6 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -24,9 +24,10 @@ import Queue as queue import threading from concurrent import futures +from google.protobuf import wrappers_pb2 import grpc -import apache_beam as beam +import apache_beam as beam # pylint: disable=ungrouped-imports from apache_beam.coders import WindowedValueCoder from apache_beam.coders.coder_impl import create_InputStream from apache_beam.coders.coder_impl import create_OutputStream @@ -34,10 +35,13 @@ from apache_beam.internal import pickler from apache_beam.io import iobase from apache_beam.transforms.window import GlobalWindows from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_runner_api_pb2 +from apache_beam.runners import pipeline_context from apache_beam.runners.portability import maptask_executor_runner from apache_beam.runners.worker import data_plane from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import sdk_worker +from apache_beam.utils import proto_utils # This module is experimental. No backwards-compatibility guarantees. @@ -110,9 +114,13 @@ OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps( class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): - def __init__(self): + def __init__(self, use_runner_protos=False): super(FnApiRunner, self).__init__() self._last_uid = -1 + if use_runner_protos: + self._map_task_to_protos = self._map_task_to_runner_protos + else: + self._map_task_to_protos = self._map_task_to_fn_protos def has_metrics_support(self): return False @@ -123,7 +131,140 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): def _map_task_registration(self, map_task, state_handler, data_operation_spec): + input_data, side_input_data, runner_sinks, process_bundle_descriptor = ( + self._map_task_to_protos(map_task, data_operation_spec)) + # Side inputs will be accessed over the state API. + for key, elements_data in side_input_data.items(): + state_key = beam_fn_api_pb2.StateKey.MultimapSideInput(key=key) + state_handler.Clear(state_key) + state_handler.Append(state_key, [elements_data]) + return beam_fn_api_pb2.InstructionRequest( + instruction_id=self._next_uid(), + register=beam_fn_api_pb2.RegisterRequest( + process_bundle_descriptor=[process_bundle_descriptor]) + ), runner_sinks, input_data + + def _map_task_to_runner_protos(self, map_task, data_operation_spec): + input_data = {} + side_input_data = {} + runner_sinks = {} + + context = pipeline_context.PipelineContext() + transform_protos = {} + used_pcollections = {} + + def uniquify(*names): + # An injective mapping from string* to string. + return ':'.join("%s:%d" % (name, len(name)) for name in names) + + def pcollection_id(op_ix, out_ix): + if (op_ix, out_ix) not in used_pcollections: + used_pcollections[op_ix, out_ix] = uniquify( + map_task[op_ix][0], 'out', str(out_ix)) + return used_pcollections[op_ix, out_ix] + + def get_inputs(op): + if hasattr(op, 'inputs'): + inputs = op.inputs + elif hasattr(op, 'input'): + inputs = [op.input] + else: + inputs = [] + return {'in%s' % ix: pcollection_id(*input) + for ix, input in enumerate(inputs)} + + def get_outputs(op_ix): + op = map_task[op_ix][1] + return {tag: pcollection_id(op_ix, out_ix) + for out_ix, tag in enumerate(getattr(op, 'output_tags', ['out']))} + + for op_ix, (stage_name, operation) in enumerate(map_task): + transform_id = uniquify(stage_name) + + if isinstance(operation, operation_specs.WorkerInMemoryWrite): + # Write this data back to the runner. + runner_sinks[(transform_id, 'out')] = operation + transform_spec = beam_runner_api_pb2.FunctionSpec( + urn=sdk_worker.DATA_OUTPUT_URN, + parameter=proto_utils.pack_Any(data_operation_spec)) + + elif isinstance(operation, operation_specs.WorkerRead): + # A Read from an in-memory source is done over the data plane. + if (isinstance(operation.source.source, + maptask_executor_runner.InMemorySource) + and isinstance(operation.source.source.default_output_coder(), + WindowedValueCoder)): + input_data[(transform_id, 'input')] = self._reencode_elements( + operation.source.source.read(None), + operation.source.source.default_output_coder()) + transform_spec = beam_runner_api_pb2.FunctionSpec( + urn=sdk_worker.DATA_INPUT_URN, + parameter=proto_utils.pack_Any(data_operation_spec)) + + else: + # Otherwise serialize the source and execute it there. + # TODO: Use SDFs with an initial impulse. + transform_spec = beam_runner_api_pb2.FunctionSpec( + urn=sdk_worker.PYTHON_SOURCE_URN, + parameter=proto_utils.pack_Any( + wrappers_pb2.BytesValue( + value=pickler.dumps(operation.source.source)))) + + elif isinstance(operation, operation_specs.WorkerDoFn): + # Record the contents of each side input for access via the state api. + side_input_extras = [] + for si in operation.side_inputs: + assert isinstance(si.source, iobase.BoundedSource) + element_coder = si.source.default_output_coder() + # TODO(robertwb): Actually flesh out the ViewFn API. + side_input_extras.append((si.tag, element_coder)) + side_input_data[sdk_worker.side_input_tag(transform_id, si.tag)] = ( + self._reencode_elements( + si.source.read(si.source.get_range_tracker(None, None)), + element_coder)) + augmented_serialized_fn = pickler.dumps( + (operation.serialized_fn, side_input_extras)) + transform_spec = beam_runner_api_pb2.FunctionSpec( + urn=sdk_worker.PYTHON_DOFN_URN, + parameter=proto_utils.pack_Any( + wrappers_pb2.BytesValue(value=augmented_serialized_fn))) + + elif isinstance(operation, operation_specs.WorkerFlatten): + # Flatten is nice and simple. + transform_spec = beam_runner_api_pb2.FunctionSpec( + urn=sdk_worker.IDENTITY_DOFN_URN) + + else: + raise NotImplementedError(operation) + + transform_protos[transform_id] = beam_runner_api_pb2.PTransform( + unique_name=stage_name, + spec=transform_spec, + inputs=get_inputs(operation), + outputs=get_outputs(op_ix)) + + pcollection_protos = { + name: beam_runner_api_pb2.PCollection( + unique_name=name, + coder_id=context.coders.get_id( + map_task[op_id][1].output_coders[out_id])) + for (op_id, out_id), name in used_pcollections.items() + } + # Must follow creation of pcollection_protos to capture used coders. + context_proto = context.to_runner_api() + process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor( + id=self._next_uid(), + transforms=transform_protos, + pcollections=pcollection_protos, + codersyyy=dict(context_proto.coders.items()), + windowing_strategies=dict(context_proto.windowing_strategies.items()), + environments=dict(context_proto.environments.items())) + return input_data, side_input_data, runner_sinks, process_bundle_descriptor + + def _map_task_to_fn_protos(self, map_task, data_operation_spec): + input_data = {} + side_input_data = {} runner_sinks = {} transforms = [] transform_index_to_id = {} @@ -264,9 +405,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): element_coder.get_impl().encode_to_stream( element, output_stream, True) elements_data = output_stream.get() - state_key = beam_fn_api_pb2.StateKey.MultimapSideInput(key=view_id) - state_handler.Clear(state_key) - state_handler.Append(state_key, elements_data) + side_input_data[view_id] = elements_data elif isinstance(operation, operation_specs.WorkerFlatten): fn = sdk_worker.pack_function_spec_data( @@ -299,13 +438,11 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): transforms.append(ptransform) process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor( - id=self._next_uid(), coders=coders.values(), + id=self._next_uid(), + coders=coders.values(), primitive_transform=transforms) - return beam_fn_api_pb2.InstructionRequest( - instruction_id=self._next_uid(), - register=beam_fn_api_pb2.RegisterRequest( - process_bundle_descriptor=[process_bundle_descriptor - ])), runner_sinks, input_data + + return input_data, side_input_data, runner_sinks, process_bundle_descriptor def _run_map_task( self, map_task, control_handler, state_handler, data_plane_handler, @@ -467,3 +604,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): self.data_plane_handler.close() self.control_server.stop(5).wait() self.data_server.stop(5).wait() + + @staticmethod + def _reencode_elements(elements, element_coder): + output_stream = create_OutputStream() + for element in elements: + element_coder.get_impl().encode_to_stream(element, output_stream, True) + return output_stream.get() http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 66d985a..e2eae26 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -23,10 +23,26 @@ from apache_beam.runners.portability import fn_api_runner from apache_beam.runners.portability import maptask_executor_runner_test -class FnApiRunnerTest(maptask_executor_runner_test.MapTaskExecutorRunnerTest): +class FnApiRunnerTestWithRunnerProtos( + maptask_executor_runner_test.MapTaskExecutorRunnerTest): def create_pipeline(self): - return beam.Pipeline(runner=fn_api_runner.FnApiRunner()) + return beam.Pipeline( + runner=fn_api_runner.FnApiRunner(use_runner_protos=True)) + + def test_combine_per_key(self): + # TODO(robertwb): Implement PGBKCV operation. + pass + + # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner + + +class FnApiRunnerTestWithFnProtos( + maptask_executor_runner_test.MapTaskExecutorRunnerTest): + + def create_pipeline(self): + return beam.Pipeline( + runner=fn_api_runner.FnApiRunner(use_runner_protos=False)) def test_combine_per_key(self): # TODO(robertwb): Implement PGBKCV operation. http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/worker/sdk_worker.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index d08b179..fd7ecc4 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -36,11 +36,13 @@ from apache_beam.coders import coder_impl from apache_beam.coders import WindowedValueCoder from apache_beam.internal import pickler from apache_beam.io import iobase -from apache_beam.runners.dataflow.native_io import iobase as native_iobase -from apache_beam.utils import counters from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.runners.dataflow.native_io import iobase as native_iobase +from apache_beam.runners import pipeline_context from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import operations +from apache_beam.utils import counters +from apache_beam.utils import proto_utils # This module is experimental. No backwards-compatibility guarantees. @@ -62,6 +64,10 @@ PYTHON_DOFN_URN = 'urn:org.apache.beam:dofn:java:0.1' PYTHON_SOURCE_URN = 'urn:org.apache.beam:source:java:0.1' +def side_input_tag(transform_id, tag): + return str("%d[%s][%s]" % (len(transform_id), transform_id, tag)) + + class RunnerIOOperation(operations.Operation): """Common baseclass for runner harness IO operations.""" @@ -208,6 +214,23 @@ def load_compressed(compressed_data): dill.dill._trace(False) # pylint: disable=protected-access +def memoize(func): + cache = {} + missing = object() + + def wrapper(*args): + result = cache.get(args, missing) + if result is missing: + result = cache[args] = func(*args) + return result + return wrapper + + +def only_element(iterable): + element, = iterable + return element + + class SdkHarness(object): def __init__(self, control_channel): @@ -296,6 +319,51 @@ class SdkWorker(object): return response def create_execution_tree(self, descriptor): + if descriptor.primitive_transform: + return self.create_execution_tree_from_fn_api(descriptor) + else: + return self.create_execution_tree_from_runner_api(descriptor) + + def create_execution_tree_from_runner_api(self, descriptor): + # TODO(robertwb): Figure out the correct prefix to use for output counters + # from StateSampler. + counter_factory = counters.CounterFactory() + state_sampler = statesampler.StateSampler( + 'fnapi-step%s-' % descriptor.id, counter_factory) + + transform_factory = BeamTransformFactory( + descriptor, self.data_channel_factory, counter_factory, state_sampler, + self.state_handler) + + pcoll_consumers = collections.defaultdict(list) + for transform_id, transform_proto in descriptor.transforms.items(): + for pcoll_id in transform_proto.inputs.values(): + pcoll_consumers[pcoll_id].append(transform_id) + + @memoize + def get_operation(transform_id): + transform_consumers = { + tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] + for tag, pcoll_id + in descriptor.transforms[transform_id].outputs.items() + } + return transform_factory.create_operation( + transform_id, transform_consumers) + + # Operations must be started (hence returned) in order. + @memoize + def topological_height(transform_id): + return 1 + max( + [0] + + [topological_height(consumer) + for pcoll in descriptor.transforms[transform_id].outputs.values() + for consumer in pcoll_consumers[pcoll]]) + + return [get_operation(transform_id) + for transform_id in sorted( + descriptor.transforms, key=topological_height, reverse=True)] + + def create_execution_tree_from_fn_api(self, descriptor): # TODO(vikasrk): Add an id field to Coder proto and use that instead. coders = {coder.function_spec.id: operation_specs.get_coder_from_spec( json.loads(unpack_function_spec_data(coder.function_spec))) @@ -418,14 +486,14 @@ class SdkWorker(object): reversed_ops.append(op) ops_by_id[transform.id] = op - return list(reversed(reversed_ops)), ops_by_id + return list(reversed(reversed_ops)) def process_bundle(self, request, instruction_id): - ops, ops_by_id = self.create_execution_tree( + ops = self.create_execution_tree( self.fns[request.process_bundle_descriptor_reference]) expected_inputs = [] - for _, op in ops_by_id.items(): + for op in ops: if isinstance(op, DataOutputOperation): # TODO(robertwb): Is there a better way to pass the instruction id to # the operation? @@ -445,9 +513,7 @@ class SdkWorker(object): for data in input_op.data_channel.input_elements( instruction_id, [input_op.target]): # ignores input name - target_op = ops_by_id[data.target.primitive_transform_reference] - # lacks coder for non-input ops - target_op.process_encoded(data.data) + input_op.process_encoded(data.data) # Finish all operations. for op in ops: @@ -455,3 +521,164 @@ class SdkWorker(object): op.finish() return beam_fn_api_pb2.ProcessBundleResponse() + + +class BeamTransformFactory(object): + """Factory for turning transform_protos into executable operations.""" + def __init__(self, descriptor, data_channel_factory, counter_factory, + state_sampler, state_handler): + self.descriptor = descriptor + self.data_channel_factory = data_channel_factory + self.counter_factory = counter_factory + self.state_sampler = state_sampler + self.state_handler = state_handler + self.context = pipeline_context.PipelineContext(descriptor) + + _known_urns = {} + + @classmethod + def register_urn(cls, urn, parameter_type): + def wrapper(func): + cls._known_urns[urn] = func, parameter_type + return func + return wrapper + + def create_operation(self, transform_id, consumers): + transform_proto = self.descriptor.transforms[transform_id] + creator, parameter_type = self._known_urns[transform_proto.spec.urn] + parameter = proto_utils.unpack_Any( + transform_proto.spec.parameter, parameter_type) + return creator(self, transform_id, transform_proto, parameter, consumers) + + def get_coder(self, coder_id): + return self.context.coders.get_by_id(coder_id) + + def get_output_coders(self, transform_proto): + return { + tag: self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id) + for tag, pcoll_id in transform_proto.outputs.items() + } + + def get_only_output_coder(self, transform_proto): + return only_element(self.get_output_coders(transform_proto).values()) + + def get_input_coders(self, transform_proto): + return { + tag: self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id) + for tag, pcoll_id in transform_proto.inputs.items() + } + + def get_only_input_coder(self, transform_proto): + return only_element(self.get_input_coders(transform_proto).values()) + + # TODO(robertwb): Update all operations to take these in the constructor. + @staticmethod + def augment_oldstyle_op(op, step_name, consumers, tag_list=None): + op.step_name = step_name + for tag, op_consumers in consumers.items(): + for consumer in op_consumers: + op.add_receiver(consumer, tag_list.index(tag) if tag_list else 0) + return op + + +@BeamTransformFactory.register_urn( + DATA_INPUT_URN, beam_fn_api_pb2.RemoteGrpcPort) +def create(factory, transform_id, transform_proto, grpc_port, consumers): + target = beam_fn_api_pb2.Target( + primitive_transform_reference=transform_id, + name=only_element(transform_proto.outputs.keys())) + return DataInputOperation( + transform_proto.unique_name, + transform_proto.unique_name, + consumers, + factory.counter_factory, + factory.state_sampler, + factory.get_only_output_coder(transform_proto), + input_target=target, + data_channel=factory.data_channel_factory.create_data_channel(grpc_port)) + + +@BeamTransformFactory.register_urn( + DATA_OUTPUT_URN, beam_fn_api_pb2.RemoteGrpcPort) +def create(factory, transform_id, transform_proto, grpc_port, consumers): + target = beam_fn_api_pb2.Target( + primitive_transform_reference=transform_id, + name='out') + return DataOutputOperation( + transform_proto.unique_name, + transform_proto.unique_name, + consumers, + factory.counter_factory, + factory.state_sampler, + # TODO(robertwb): Perhaps this could be distinct from the input coder? + factory.get_only_input_coder(transform_proto), + target=target, + data_channel=factory.data_channel_factory.create_data_channel(grpc_port)) + + +@BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue) +def create(factory, transform_id, transform_proto, parameter, consumers): + source = pickler.loads(parameter.value) + spec = operation_specs.WorkerRead( + iobase.SourceBundle(1.0, source, None, None), + [WindowedValueCoder(source.default_output_coder())]) + return factory.augment_oldstyle_op( + operations.ReadOperation( + transform_proto.unique_name, + spec, + factory.counter_factory, + factory.state_sampler), + transform_proto.unique_name, + consumers) + + +@BeamTransformFactory.register_urn(PYTHON_DOFN_URN, wrappers_pb2.BytesValue) +def create(factory, transform_id, transform_proto, parameter, consumers): + dofn_data = pickler.loads(parameter.value) + if len(dofn_data) == 2: + # Has side input data. + serialized_fn, side_input_data = dofn_data + else: + # No side input data. + serialized_fn, side_input_data = parameter.value, [] + + def create_side_input(tag, coder): + # TODO(robertwb): Extract windows (and keys) out of element data. + # TODO(robertwb): Extract state key from ParDoPayload. + return operation_specs.WorkerSideInputSource( + tag=tag, + source=SideInputSource( + factory.state_handler, + beam_fn_api_pb2.StateKey.MultimapSideInput( + key=side_input_tag(transform_id, tag)), + coder=coder)) + output_tags = list(transform_proto.outputs.keys()) + output_coders = factory.get_output_coders(transform_proto) + spec = operation_specs.WorkerDoFn( + serialized_fn=serialized_fn, + output_tags=output_tags, + input=None, + side_inputs=[ + create_side_input(tag, coder) for tag, coder in side_input_data], + output_coders=[output_coders[tag] for tag in output_tags]) + return factory.augment_oldstyle_op( + operations.DoOperation( + transform_proto.unique_name, + spec, + factory.counter_factory, + factory.state_sampler), + transform_proto.unique_name, + consumers, + output_tags) + + +@BeamTransformFactory.register_urn(IDENTITY_DOFN_URN, None) +def create(factory, transform_id, transform_proto, unused_parameter, consumers): + return factory.augment_oldstyle_op( + operations.FlattenOperation( + transform_proto.unique_name, + None, + factory.counter_factory, + factory.state_sampler), + transform_proto.unique_name, + consumers)