riteshghorse commented on code in PR #28564:
URL: https://github.com/apache/beam/pull/28564#discussion_r1344636199


##########
sdks/python/apache_beam/transforms/environments.py:
##########
@@ -124,9 +124,9 @@ def __init__(self,
         dict(resource_hints) if resource_hints else {})
 
   def __eq__(self, other):
+    # don't compare artifacts since they have different hashes in their names.
     return (
-        self.__class__ == other.__class__ and
-        self._artifacts == other._artifacts

Review Comment:
   > Do you have a stack trace for the failure ?
   > 
   > The way it works now is that we stage all artifacts at job submission and 
each Environment carries the correct set of dependencies for that particular 
environment.
   > 
   > At runtime, the runner (UW for Dataflow) would look up the correct set of 
dependencies from the Runner API proto an serve them to the corresponding SDKs.
   
   Stack trace for `ExternalTransformTest` failure:
   ```
   
apache_beam/transforms/external_test.py::ExternalTransformTest::test_pipeline_generation
 FAILED                                                                         
                       [100%]
   
   
==============================================================================================
 FAILURES 
==============================================================================================
   ___________________________________________________________________________ 
ExternalTransformTest.test_pipeline_generation 
___________________________________________________________________________
   
   self = <apache_beam.transforms.external_test.ExternalTransformTest 
testMethod=test_pipeline_generation>
   
       def test_pipeline_generation(self):
         pipeline = beam.Pipeline()
         _ = (
   >         pipeline
             | beam.Create(['a', 'b'])
             | beam.ExternalTransform(
                 'beam:transforms:xlang:test:prefix',
                 ImplicitSchemaPayloadBuilder({'data': '0'}),
                 expansion_service.ExpansionServiceServicer()))
   
   apache_beam/transforms/external_test.py:181: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   apache_beam/pvalue.py:137: in __or__
       return self.pipeline.apply(ptransform, self)
   apache_beam/pipeline.py:712: in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
   apache_beam/runners/runner.py:203: in apply
       return self.apply_PTransform(transform, input, options)
   apache_beam/runners/runner.py:207: in apply_PTransform
       return transform.expand(input)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   self = <ExternalTransform(PTransform) 
label=[ExternalTransform(beam:transforms:xlang:test:prefix)] at 0x11feb4d90>, 
pvalueish = <PCollection[Create/Map(decode).None] at 0x11feb4700>
   
       def expand(self, pvalueish):
         # type: (pvalue.PCollection) -> pvalue.PCollection
         if isinstance(pvalueish, pvalue.PBegin):
           self._inputs = {}
         elif isinstance(pvalueish, (list, tuple)):
           self._inputs = {str(ix): pvalue for ix, pvalue in 
enumerate(pvalueish)}
         elif isinstance(pvalueish, dict):
           self._inputs = pvalueish
         else:
           self._inputs = {'input': pvalueish}
         pipeline = (
             next(iter(self._inputs.values())).pipeline
             if self._inputs else pvalueish.pipeline)
         context = pipeline_context.PipelineContext(
             component_id_map=pipeline.component_id_map)
         transform_proto = beam_runner_api_pb2.PTransform(
             unique_name=pipeline._current_transform().full_label,
             spec=beam_runner_api_pb2.FunctionSpec(
                 urn=self._urn, payload=self._payload))
         for tag, pcoll in self._inputs.items():
           transform_proto.inputs[tag] = context.pcollections.get_id(pcoll)
           # Conversion to/from proto assumes producers.
           # TODO: Possibly loosen this.
           context.transforms.put_proto(
               '%s_%s' % (self._IMPULSE_PREFIX, tag),
               beam_runner_api_pb2.PTransform(
                   unique_name='%s_%s' % (self._IMPULSE_PREFIX, tag),
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=common_urns.primitives.IMPULSE.urn),
                   outputs={'out': transform_proto.inputs[tag]}))
         output_coders = None
         if self._type_hints.output_types:
           if self._type_hints.output_types[0]:
             output_coders = dict(
                 (str(k), context.coder_id_from_element_type(v))
                 for (k, v) in enumerate(self._type_hints.output_types[0]))
           elif self._type_hints.output_types[1]:
             output_coders = {
                 k: context.coder_id_from_element_type(v)
                 for (k, v) in self._type_hints.output_types[1].items()
             }
         components = context.to_runner_api()
         request = beam_expansion_api_pb2.ExpansionRequest(
             components=components,
             namespace=self._external_namespace,
             transform=transform_proto,
             output_coder_requests=output_coders)
       
         expansion_service = _maybe_use_transform_service(
             self._expansion_service, pipeline.options)
       
         with ExternalTransform.service(expansion_service) as service:
           response = service.Expand(request)
           if response.error:
             raise RuntimeError(response.error)
           self._expanded_components = response.components
           if any(env.dependencies
                  for env in self._expanded_components.environments.values()):
             self._expanded_components = self._resolve_artifacts(
                 self._expanded_components,
   >             service.artifact_service(),
                 pipeline.local_tempdir)
   E         AttributeError: 'ExpansionServiceServicer' object has no attribute 
'artifact_service'
   
   apache_beam/transforms/external.py:736: AttributeError
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to