[
https://issues.apache.org/jira/browse/BEAM-14112?focusedWorklogId=746031&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-746031
]
ASF GitHub Bot logged work on BEAM-14112:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Mar/22 18:45
Start Date: 22/Mar/22 18:45
Worklog Time Spent: 10m
Work Description: chunyang commented on pull request #17100:
URL: https://github.com/apache/beam/pull/17100#issuecomment-1075505398
The error appears to be due to the temp dataset being deleted before the
table schema is read. I can move the table schema retrieval back into
`_export_files` so that it returns the `schema, metadata_list` tuple again, for
example,
```python
self.export_schema, self.export_metadata_list = self._export_files(bq)
```
The problem here is that `self.export_schema` is not picklable.
```
[...]
apache_beam/coders/coder_impl.py:272: in decode_from_stream
return self._decoder(stream.read_all(nested))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _
self = [], sequence = [<TableFieldSchema
fields: []
mode: 'NULLABLE'
name: 'f0_'
type: 'INTEGER'>]
def extend(self, sequence):
"""Validate extension of list."""
> self.__field.validate(sequence)
E AttributeError: 'FieldList' object has no attribute
'_FieldList__field'
```
@aaltay I saw you ran into the same issue previously here?
https://github.com/google/apitools/pull/311 Did you have a workaround? AFAICT a
newer version of apitools was never released.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 746031)
Time Spent: 5h 50m (was: 5h 40m)
> ReadFromBigQuery cannot be used with the interactive runner
> -----------------------------------------------------------
>
> Key: BEAM-14112
> URL: https://issues.apache.org/jira/browse/BEAM-14112
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp, runner-py-interactive
> Affects Versions: 2.35.0, 2.36.0, 2.37.0
> Reporter: Chun Yang
> Assignee: Chun Yang
> Priority: P2
> Fix For: 2.38.0
>
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> A change in Apache Beam 2.35.0 caused ReadFromBigQuery to no longer work with
> the Python interactive runner.
> The error can be reproduced with the following code:
> {code:python}#!/usr/bin/env python
> """Reproduce pickle issue when using RFBQ in interactive runner."""
> import apache_beam as beam
>
> from apache_beam.runners.interactive.interactive_runner import
> InteractiveRunner
> import apache_beam.runners.interactive.interactive_beam as ib
>
>
>
>
>
> options = beam.options.pipeline_options.PipelineOptions(
>
> project="...",
> temp_location="...",
> )
>
>
>
> pipeline = beam.Pipeline(InteractiveRunner(), options=options)
>
> pcoll = pipeline | beam.io.ReadFromBigQuery(query="SELECT 1")
>
> print(ib.collect(pcoll)){code}
> {code:none}Traceback (most recent call last):
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 214, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 178, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
> File "apache_beam/runners/worker/opcounters.py", line 211, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
> File "apache_beam/runners/worker/opcounters.py", line 250, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
> File "apache_beam/coders/coder_impl.py", line 1425, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1436, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 207, in
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1514, in
> apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 246, in
> apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 441, in
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
> File "apache_beam/coders/coder_impl.py", line 268, in
> apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
> line 802, in <lambda>
> lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
> File "repro.py", line 16, in <module>
> print(ib.collect(pcoll))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py",
> line 270, in run_within_progress_indicator
> return func(*args, **kwargs)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py",
> line 664, in collect
> recording = recording_manager.record([pcoll], max_n=n,
> max_duration=duration)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py",
> line 458, in record
> self.user_pipeline.options).run()
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py",
> line 113, in run
> return self.deduce_fragment().run()
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 573, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py",
> line 195, in run_pipeline
> pipeline_to_execute.run(), pipeline_instrument)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 573, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 131, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 200, in run_pipeline
> pipeline.to_runner_api(default_environment=self._default_environment))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 210, in run_via_runner_api
> return self.run_stages(stage_context, stages)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 396, in run_stages
> runner_execution_context, bundle_context_manager)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 667, in _run_stage
> bundle_manager))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 784, in _run_bundle
> data_input, data_output, input_timers, expected_timer_output)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 1094, in process_bundle
> result_future = self._worker_handler.control_conn.push(process_bundle_req)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
> line 378, in push
> response = self.worker.do_instruction(request)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 581, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 618, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 996, in process_bundle
> element.data)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 221, in process_encoded
> self.output(decoded_value)
> File "apache_beam/runners/worker/operations.py", line 346, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 348, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 215, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 707, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 708, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1200, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1265, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 215, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 707, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 708, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1200, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1265, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 215, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 707, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 708, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1200, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1281, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 214, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 178, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
> File "apache_beam/runners/worker/opcounters.py", line 211, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
> File "apache_beam/runners/worker/opcounters.py", line 250, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
> File "apache_beam/coders/coder_impl.py", line 1425, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1436, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 207, in
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1514, in
> apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 246, in
> apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 441, in
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
> File "apache_beam/coders/coder_impl.py", line 268, in
> apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
> line 802, in <lambda>
> lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects [while running
> 'ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']{code}
> I suspect the error is caused by this change that was first released in
> 2.35.0: https://github.com/apache/beam/pull/15610
--
This message was sent by Atlassian Jira
(v8.20.1#820001)