riteshghorse commented on code in PR #28564:
URL: https://github.com/apache/beam/pull/28564#discussion_r1344636199
##########
sdks/python/apache_beam/transforms/environments.py:
##########
@@ -124,9 +124,9 @@ def __init__(self,
dict(resource_hints) if resource_hints else {})
def __eq__(self, other):
+ # don't compare artifacts since they have different hashes in their names.
return (
- self.__class__ == other.__class__ and
- self._artifacts == other._artifacts
Review Comment:
> Do you have a stack trace for the failure ?
>
> The way it works now is that we stage all artifacts at job submission and
each Environment carries the correct set of dependencies for that particular
environment.
>
> At runtime, the runner (UW for Dataflow) would look up the correct set of
dependencies from the Runner API proto an serve them to the corresponding SDKs.
Stack trace for `ExternalTransformTest` failure:
```
apache_beam/transforms/external_test.py::ExternalTransformTest::test_pipeline_generation
FAILED
[100%]
==============================================================================================
FAILURES
==============================================================================================
___________________________________________________________________________
ExternalTransformTest.test_pipeline_generation
___________________________________________________________________________
self = <apache_beam.transforms.external_test.ExternalTransformTest
testMethod=test_pipeline_generation>
def test_pipeline_generation(self):
pipeline = beam.Pipeline()
_ = (
> pipeline
| beam.Create(['a', 'b'])
| beam.ExternalTransform(
'beam:transforms:xlang:test:prefix',
ImplicitSchemaPayloadBuilder({'data': '0'}),
expansion_service.ExpansionServiceServicer()))
apache_beam/transforms/external_test.py:181:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pvalue.py:137: in __or__
return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:712: in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:203: in apply
return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:207: in apply_PTransform
return transform.expand(input)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <ExternalTransform(PTransform)
label=[ExternalTransform(beam:transforms:xlang:test:prefix)] at 0x11feb4d90>,
pvalueish = <PCollection[Create/Map(decode).None] at 0x11feb4700>
def expand(self, pvalueish):
# type: (pvalue.PCollection) -> pvalue.PCollection
if isinstance(pvalueish, pvalue.PBegin):
self._inputs = {}
elif isinstance(pvalueish, (list, tuple)):
self._inputs = {str(ix): pvalue for ix, pvalue in
enumerate(pvalueish)}
elif isinstance(pvalueish, dict):
self._inputs = pvalueish
else:
self._inputs = {'input': pvalueish}
pipeline = (
next(iter(self._inputs.values())).pipeline
if self._inputs else pvalueish.pipeline)
context = pipeline_context.PipelineContext(
component_id_map=pipeline.component_id_map)
transform_proto = beam_runner_api_pb2.PTransform(
unique_name=pipeline._current_transform().full_label,
spec=beam_runner_api_pb2.FunctionSpec(
urn=self._urn, payload=self._payload))
for tag, pcoll in self._inputs.items():
transform_proto.inputs[tag] = context.pcollections.get_id(pcoll)
# Conversion to/from proto assumes producers.
# TODO: Possibly loosen this.
context.transforms.put_proto(
'%s_%s' % (self._IMPULSE_PREFIX, tag),
beam_runner_api_pb2.PTransform(
unique_name='%s_%s' % (self._IMPULSE_PREFIX, tag),
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.primitives.IMPULSE.urn),
outputs={'out': transform_proto.inputs[tag]}))
output_coders = None
if self._type_hints.output_types:
if self._type_hints.output_types[0]:
output_coders = dict(
(str(k), context.coder_id_from_element_type(v))
for (k, v) in enumerate(self._type_hints.output_types[0]))
elif self._type_hints.output_types[1]:
output_coders = {
k: context.coder_id_from_element_type(v)
for (k, v) in self._type_hints.output_types[1].items()
}
components = context.to_runner_api()
request = beam_expansion_api_pb2.ExpansionRequest(
components=components,
namespace=self._external_namespace,
transform=transform_proto,
output_coder_requests=output_coders)
expansion_service = _maybe_use_transform_service(
self._expansion_service, pipeline.options)
with ExternalTransform.service(expansion_service) as service:
response = service.Expand(request)
if response.error:
raise RuntimeError(response.error)
self._expanded_components = response.components
if any(env.dependencies
for env in self._expanded_components.environments.values()):
self._expanded_components = self._resolve_artifacts(
self._expanded_components,
> service.artifact_service(),
pipeline.local_tempdir)
E AttributeError: 'ExpansionServiceServicer' object has no attribute
'artifact_service'
apache_beam/transforms/external.py:736: AttributeError
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]