damccorm commented on PR #35160:
URL: https://github.com/apache/beam/pull/35160#issuecomment-2944337125

   Looking at test results, I see a few failures. From PreCommit_Python:
   
   ```
   <testcase 
classname="apache_beam.io.gcp.pubsub_test.TestWriteStringsToPubSubOverride" 
name="test_expand" time="0.098">
   <failure message="apache_beam.typehints.decorators.TypeCheckError: Transform 
"Map(<lambda at pubsub_test.py:398>)" was applied to the output of 
"WriteToPubSub/Write" but "Write" produces no PCollections.">self = 
<apache_beam.io.gcp.pubsub_test.TestWriteStringsToPubSubOverride 
testMethod=test_expand> def test_expand(self): options = PipelineOptions([]) 
options.view_as(StandardOptions).streaming = True p = 
TestPipeline(options=options) pcoll = ( > p | 
ReadFromPubSub('projects/fakeprj/topics/baz') | beam.Map(lambda x: 
PubsubMessage(x)) | WriteToPubSub( 'projects/fakeprj/topics/a_topic', 
with_attributes=True) | beam.Map(lambda x: x)) 
apache_beam/io/gcp/pubsub_test.py:393: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pvalue.py:138:
 in __or__ return self.pipeline.apply(ptransform, self) _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apa
 che_beam.testing.test_pipeline.TestPipeline object at 0x79d87a32f880> 
transform = <ParDo(PTransform) label=[Map(<lambda at pubsub_test.py:398>)] at 
0x79d878f76a70> pvalueish = <PDone[WriteToPubSub/Write.None] at 
0x79d878f775e0>, label = None def apply( self, transform, # type: 
ptransform.PTransform pvalueish=None, # type: Optional[pvalue.PValue] 
label=None # type: Optional[str] ): # type: (...) -> pvalue.PValue """Applies a 
custom transform using the pvalueish specified. Args: transform 
(~apache_beam.transforms.ptransform.PTransform): the 
:class:`~apache_beam.transforms.ptransform.PTransform` to apply. pvalueish 
(~apache_beam.pvalue.PCollection): the input for the 
:class:`~apache_beam.transforms.ptransform.PTransform` (typically a 
:class:`~apache_beam.pvalue.PCollection`). label (str): label of the 
:class:`~apache_beam.transforms.ptransform.PTransform`. Raises: TypeError: if 
the transform object extracted from the argument list is not a 
:class:`~apache_beam.transforms.ptransform.PTr
 ansform`. RuntimeError: if the transform object was already applied to this 
pipeline and needs to be cloned in order to apply again. """ if 
isinstance(transform, ptransform._NamedPTransform): return self.apply( 
transform.transform, pvalueish, label or transform.label) if not 
isinstance(transform, ptransform.PTransform): raise TypeError("Expected a 
PTransform object, got %s" % transform) if label: # Fix self.label as it is 
inspected by some PTransform operations # (e.g. to produce error messages for 
type hint violations). old_label, transform.label = transform.label, label try: 
return self.apply(transform, pvalueish) finally: transform.label = old_label # 
Attempts to alter the label of the transform to be applied only when it's # a 
top-level transform so that the cell number will not be prepended to # every 
child transform in a composite. if self._current_transform() is 
self._root_transform(): alter_label_if_ipython(transform, pvalueish) full_label 
= '/'.join( [self._current_transfor
 m().full_label, transform.label]).lstrip('/') if full_label in 
self.applied_labels: auto_unique_labels = self._options.view_as( 
StandardOptions).auto_unique_labels if auto_unique_labels: # If 
auto_unique_labels is set, we will append a unique suffix to the # label to 
make it unique. logging.warning( 'Using --auto_unique_labels could cause data 
loss when ' 'updating a pipeline or reloading the job state. ' 'This is not 
recommended for streaming jobs.') unique_label = 
self._generate_unique_label(transform) return self.apply(transform, pvalueish, 
unique_label) else: raise RuntimeError( 'A transform with label "%s" already 
exists in the pipeline. ' 'To apply a transform with a specified label, write ' 
'pvalue | "label" >> transform or use the option ' '"auto_unique_labels" to 
automatically generate unique ' 'transform labels. Note "auto_unique_labels" ' 
'could cause data loss when updating a pipeline or ' 'reloading the job state. 
This is not recommended for ' 'streaming jobs.' % full_l
 abel) self.applied_labels.add(full_label) pvalueish, inputs = 
transform._extract_input_pvalues(pvalueish) try: if not isinstance(inputs, 
dict): inputs = {str(ix): input for (ix, input) in enumerate(inputs)} except 
TypeError: raise NotImplementedError( 'Unable to extract PValue inputs from %s; 
either %s does not accept ' 'inputs of this format, or it does not properly 
override ' '_extract_input_pvalues' % (pvalueish, transform)) for t, leaf_input 
in inputs.items(): if not isinstance(leaf_input, pvalue.PValue) or not 
isinstance(t, str): raise NotImplementedError( '%s does not properly override 
_extract_input_pvalues, ' 'returned %s from %s' % (transform, inputs, 
pvalueish)) current = AppliedPTransform( self._current_transform(), transform, 
full_label, inputs, None, annotations=self._current_annotations()) 
self._current_transform().add_part(current) try: 
self.transforms_stack.append(current) type_options = 
self._options.view_as(TypeOptions) if type_options.pipeline_type_check: transfor
 m.type_check_inputs(pvalueish) if isinstance(pvalueish, pvalue.PDone) and 
isinstance(transform, ParDo): # If the input is a PDone, we cannot apply a 
ParDo transform. full_label = self._current_transform().full_label 
producer_label = pvalueish.producer.full_label > raise TypeCheckError( 
f'Transform "{full_label}" was applied to the output of ' f'"{producer_label}" 
but "{producer_label.split("/")[-1]}" ' 'produces no PCollections.') E 
apache_beam.typehints.decorators.TypeCheckError: Transform "Map(<lambda at 
pubsub_test.py:398>)" was applied to the output of "WriteToPubSub/Write" but 
"Write" produces no PCollections. 
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pipeline.py:804:
 TypeCheckError</failure>
   </testcase>
   ```
   
   and:
   
   ```
   <testcase 
classname="apache_beam.io.gcp.pubsub_test.TestWriteStringsToPubSubOverride" 
name="test_expand_deprecated" time="0.096">
   <failure message="apache_beam.typehints.decorators.TypeCheckError: Transform 
"Map(<lambda at pubsub_test.py:375>)" was applied to the output of 
"_WriteStringsToPubSub/WriteToPubSub/Write" but "Write" produces no 
PCollections.">self = 
<apache_beam.io.gcp.pubsub_test.TestWriteStringsToPubSubOverride 
testMethod=test_expand_deprecated> def test_expand_deprecated(self): options = 
PipelineOptions([]) options.view_as(StandardOptions).streaming = True p = 
TestPipeline(options=options) pcoll = ( > p | 
ReadFromPubSub('projects/fakeprj/topics/baz') | 
WriteStringsToPubSub('projects/fakeprj/topics/a_topic') | beam.Map(lambda x: 
x)) apache_beam/io/gcp/pubsub_test.py:372: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pvalue.py:138:
 in __or__ return self.pipeline.apply(ptransform, self) _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.te
 sting.test_pipeline.TestPipeline object at 0x79d8791b6290> transform = 
<ParDo(PTransform) label=[Map(<lambda at pubsub_test.py:375>)] at 
0x79d8791b4940> pvalueish = 
<PDone[_WriteStringsToPubSub/WriteToPubSub/Write.None] at 0x79d8792d30a0> label 
= None def apply( self, transform, # type: ptransform.PTransform 
pvalueish=None, # type: Optional[pvalue.PValue] label=None # type: 
Optional[str] ): # type: (...) -> pvalue.PValue """Applies a custom transform 
using the pvalueish specified. Args: transform 
(~apache_beam.transforms.ptransform.PTransform): the 
:class:`~apache_beam.transforms.ptransform.PTransform` to apply. pvalueish 
(~apache_beam.pvalue.PCollection): the input for the 
:class:`~apache_beam.transforms.ptransform.PTransform` (typically a 
:class:`~apache_beam.pvalue.PCollection`). label (str): label of the 
:class:`~apache_beam.transforms.ptransform.PTransform`. Raises: TypeError: if 
the transform object extracted from the argument list is not a 
:class:`~apache_beam.transforms.ptra
 nsform.PTransform`. RuntimeError: if the transform object was already applied 
to this pipeline and needs to be cloned in order to apply again. """ if 
isinstance(transform, ptransform._NamedPTransform): return self.apply( 
transform.transform, pvalueish, label or transform.label) if not 
isinstance(transform, ptransform.PTransform): raise TypeError("Expected a 
PTransform object, got %s" % transform) if label: # Fix self.label as it is 
inspected by some PTransform operations # (e.g. to produce error messages for 
type hint violations). old_label, transform.label = transform.label, label try: 
return self.apply(transform, pvalueish) finally: transform.label = old_label # 
Attempts to alter the label of the transform to be applied only when it's # a 
top-level transform so that the cell number will not be prepended to # every 
child transform in a composite. if self._current_transform() is 
self._root_transform(): alter_label_if_ipython(transform, pvalueish) full_label 
= '/'.join( [self._curren
 t_transform().full_label, transform.label]).lstrip('/') if full_label in 
self.applied_labels: auto_unique_labels = self._options.view_as( 
StandardOptions).auto_unique_labels if auto_unique_labels: # If 
auto_unique_labels is set, we will append a unique suffix to the # label to 
make it unique. logging.warning( 'Using --auto_unique_labels could cause data 
loss when ' 'updating a pipeline or reloading the job state. ' 'This is not 
recommended for streaming jobs.') unique_label = 
self._generate_unique_label(transform) return self.apply(transform, pvalueish, 
unique_label) else: raise RuntimeError( 'A transform with label "%s" already 
exists in the pipeline. ' 'To apply a transform with a specified label, write ' 
'pvalue | "label" >> transform or use the option ' '"auto_unique_labels" to 
automatically generate unique ' 'transform labels. Note "auto_unique_labels" ' 
'could cause data loss when updating a pipeline or ' 'reloading the job state. 
This is not recommended for ' 'streaming jobs.
 ' % full_label) self.applied_labels.add(full_label) pvalueish, inputs = 
transform._extract_input_pvalues(pvalueish) try: if not isinstance(inputs, 
dict): inputs = {str(ix): input for (ix, input) in enumerate(inputs)} except 
TypeError: raise NotImplementedError( 'Unable to extract PValue inputs from %s; 
either %s does not accept ' 'inputs of this format, or it does not properly 
override ' '_extract_input_pvalues' % (pvalueish, transform)) for t, leaf_input 
in inputs.items(): if not isinstance(leaf_input, pvalue.PValue) or not 
isinstance(t, str): raise NotImplementedError( '%s does not properly override 
_extract_input_pvalues, ' 'returned %s from %s' % (transform, inputs, 
pvalueish)) current = AppliedPTransform( self._current_transform(), transform, 
full_label, inputs, None, annotations=self._current_annotations()) 
self._current_transform().add_part(current) try: 
self.transforms_stack.append(current) type_options = 
self._options.view_as(TypeOptions) if type_options.pipeline_type_check
 : transform.type_check_inputs(pvalueish) if isinstance(pvalueish, 
pvalue.PDone) and isinstance(transform, ParDo): # If the input is a PDone, we 
cannot apply a ParDo transform. full_label = 
self._current_transform().full_label producer_label = 
pvalueish.producer.full_label > raise TypeCheckError( f'Transform 
"{full_label}" was applied to the output of ' f'"{producer_label}" but 
"{producer_label.split("/")[-1]}" ' 'produces no PCollections.') E 
apache_beam.typehints.decorators.TypeCheckError: Transform "Map(<lambda at 
pubsub_test.py:375>)" was applied to the output of 
"_WriteStringsToPubSub/WriteToPubSub/Write" but "Write" produces no 
PCollections. 
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pipeline.py:804:
 TypeCheckError</failure>
   </testcase>
   ```
   
   Precommit_Coverage has the same failures. The runners/transforms failures do 
look like flakes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to