damccorm commented on PR #35156:
URL: https://github.com/apache/beam/pull/35156#issuecomment-2944421389
I see:
```
<testcase classname="apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery"
name="test_to_from_runner_api" time="0.136">
<failure message="apache_beam.typehints.decorators.TypeCheckError: Transform
'MyWriteToBigQuery/BigQueryBatchFileLoads/RewindowIntoGlobal' expects a
PCollection as input. Got a PBegin/Pipeline instead.">self =
<apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery
testMethod=test_to_from_runner_api> def test_to_from_runner_api(self): """Tests
that serialization of WriteToBigQuery is correct. This is not intended to be a
change-detector test. As such, this only tests the more complicated
serialization logic of parameters: ValueProviders, callables, and side inputs.
""" FULL_OUTPUT_TABLE = 'test_project:output_table' p = TestPipeline() # Used
for testing side input parameters. table_record_pcv = beam.pvalue.AsDict( p |
"MakeTable" >> beam.Create([('table', FULL_OUTPUT_TABLE)])) # Used for testing
value provider parameters. schema = value_provider.StaticValueProvider(str,
'"a:str"') original = WriteToBigQuery( table=lambda _, side_input:
side_input['table'], table_side_inputs=(table_r
ecord_pcv, ), schema=schema) # pylint: disable=expression-not-assigned > p |
'MyWriteToBigQuery' >> original
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_test.py:971:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py:1133:
in __ror__ return self.transform.__ror__(pvalueish, self.label)
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py:618:
in __ror__ result = p.apply(self, pvalueish, label)
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pipeline.py:733:
in apply return self.apply(transform, pvalueish)
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pipeline.py:805:
in apply pvalueish_result = self.runner.apply(transform, pvalueish,
self._options) target/.tox-py310-cloud/py310-cloud/lib/python3.10/
site-packages/apache_beam/runners/runner.py:191: in apply return
self.apply_PTransform(transform, input, options)
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/runners/runner.py:195:
in apply_PTransform return transform.expand(input)
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py:2272:
in expand output = pcoll | BigQueryBatchFileLoads(
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pvalue.py:138:
in __or__ return self.pipeline.apply(ptransform, self)
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pipeline.py:805:
in apply pvalueish_result = self.runner.apply(transform, pvalueish,
self._options)
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/runners/runner.py:191:
in apply return self.apply_PTransform(transform, input, options)
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/runners/runne
r.py:195: in apply_PTransform return transform.expand(input)
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py:1267:
in expand pcoll
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pvalue.py:138:
in __or__ return self.pipeline.apply(ptransform, self)
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pipeline.py:722:
in apply return self.apply(
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pipeline.py:733:
in apply return self.apply(transform, pvalueish) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self =
<apache_beam.testing.test_pipeline.TestPipeline object at 0x7eac616f8940>
transform = <WindowInto(PTransform) label=[WindowInto(WindowIntoFn)] at
0x7eac537f7040> pvalueish = <PBegin[None.None] at 0x7eac616f9810>, label = None
def apply( self, transform, # type: ptransform.PTransform pvalueish=None, #
type:
Optional[pvalue.PValue] label=None # type: Optional[str] ): # type: (...) ->
pvalue.PValue """Applies a custom transform using the pvalueish specified.
Args: transform (~apache_beam.transforms.ptransform.PTransform): the
:class:`~apache_beam.transforms.ptransform.PTransform` to apply. pvalueish
(~apache_beam.pvalue.PCollection): the input for the
:class:`~apache_beam.transforms.ptransform.PTransform` (typically a
:class:`~apache_beam.pvalue.PCollection`). label (str): label of the
:class:`~apache_beam.transforms.ptransform.PTransform`. Raises: TypeError: if
the transform object extracted from the argument list is not a
:class:`~apache_beam.transforms.ptransform.PTransform`. RuntimeError: if the
transform object was already applied to this pipeline and needs to be cloned in
order to apply again. """ if isinstance(transform,
ptransform._NamedPTransform): return self.apply( transform.transform,
pvalueish, label or transform.label) if not isinstance(transform,
ptransform.PTransform): ra
ise TypeError("Expected a PTransform object, got %s" % transform) if label: #
Fix self.label as it is inspected by some PTransform operations # (e.g. to
produce error messages for type hint violations). old_label, transform.label =
transform.label, label try: return self.apply(transform, pvalueish) finally:
transform.label = old_label # Attempts to alter the label of the transform to
be applied only when it's # a top-level transform so that the cell number will
not be prepended to # every child transform in a composite. if
self._current_transform() is self._root_transform():
alter_label_if_ipython(transform, pvalueish) full_label = '/'.join(
[self._current_transform().full_label, transform.label]).lstrip('/') if
full_label in self.applied_labels: auto_unique_labels = self._options.view_as(
StandardOptions).auto_unique_labels if auto_unique_labels: # If
auto_unique_labels is set, we will append a unique suffix to the # label to
make it unique. logging.warning( 'Using --auto_unique_la
bels could cause data loss when ' 'updating a pipeline or reloading the job
state. ' 'This is not recommended for streaming jobs.') unique_label =
self._generate_unique_label(transform) return self.apply(transform, pvalueish,
unique_label) else: raise RuntimeError( 'A transform with label "%s" already
exists in the pipeline. ' 'To apply a transform with a specified label, write '
'pvalue | "label" >> transform or use the option ' '"auto_unique_labels" to
automatically generate unique ' 'transform labels. Note "auto_unique_labels" '
'could cause data loss when updating a pipeline or ' 'reloading the job state.
This is not recommended for ' 'streaming jobs.' % full_label)
self.applied_labels.add(full_label) pvalueish, inputs =
transform._extract_input_pvalues(pvalueish) try: if not isinstance(inputs,
dict): inputs = {str(ix): input for (ix, input) in enumerate(inputs)} except
TypeError: raise NotImplementedError( 'Unable to extract PValue inputs from %s;
either %s does not accept ' 'i
nputs of this format, or it does not properly override '
'_extract_input_pvalues' % (pvalueish, transform)) for t, leaf_input in
inputs.items(): if not isinstance(leaf_input, pvalue.PValue) or not
isinstance(t, str): raise NotImplementedError( '%s does not properly override
_extract_input_pvalues, ' 'returned %s from %s' % (transform, inputs,
pvalueish)) current = AppliedPTransform( self._current_transform(), transform,
full_label, inputs, None, annotations=self._current_annotations())
self._current_transform().add_part(current) try:
self.transforms_stack.append(current) type_options =
self._options.view_as(TypeOptions) if type_options.pipeline_type_check:
transform.type_check_inputs(pvalueish) if isinstance(pvalueish, pvalue.PBegin)
and isinstance(transform, ParDo): full_label =
self._current_transform().full_label > raise TypeCheckError( f"Transform
'{full_label}' expects a PCollection as input. " "Got a PBegin/Pipeline
instead.") E apache_beam.typehints.decorators.TypeCheckError:
Transform 'MyWriteToBigQuery/BigQueryBatchFileLoads/RewindowIntoGlobal'
expects a PCollection as input. Got a PBegin/Pipeline instead.
target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/pipeline.py:801:
TypeCheckError</failure>
</testcase>
```
It looks like the failure is from writing a PBegin to BigQuery which does
not make sense (but is reasonable for a test I guess). I think we can fix that
by inserting a create in that test and then tweaking the asserts
> Not sure what's going on with some of those precommit builds. It looked
like everything passed?
FWIW, my usual path to find failing tests is:
1) Click on the failing precommit
2) Click summary
3) Scroll to the bottom and view one of the test result summaries to get a
sense of how many tests failed
4) Download the test results at the bottom to figure out which test actually
failed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]