[
https://issues.apache.org/jira/browse/BEAM-14112?focusedWorklogId=746790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-746790
]
ASF GitHub Bot logged work on BEAM-14112:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Mar/22 18:29
Start Date: 23/Mar/22 18:29
Worklog Time Spent: 10m
Work Description: KevinGG commented on pull request #17153:
URL: https://github.com/apache/beam/pull/17153#issuecomment-1076678937
> Just to explore some other options other than converting schema to JSON,
prior to #15610, there was no generator or TableSchema to cause pickling errors.
>
> Instead of storing the generator as an instance attribute, a list of
[TextSource](https://github.com/apache/beam/blob/v2.34.0/sdks/python/apache_beam/io/gcp/bigquery.py#L793-L798)
and notably its `coder` attribute were stored instead (we can assume
`use_json_exports=True` for this discussion). The default `coder`,
`_JsonToDictCoder`, had a method
[`_convert_to_tuple`](https://github.com/apache/beam/blob/v2.34.0/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py#L401-L413)
to marshal the TableSchema into an object more amenable to pickling.
>
> Perhaps I can use the same `_convert_to_tuple` method to create a
picklable version of TableSchema and store that as an attribute rather than
going the JSON route?
How about we store the coders directly in the `_BigQueryExportResult` class?
Since we don't really need the TableSchema later but a coder built from the
TableSchema.
I already tested it out that the coder itself is pickle-able.
`_JsonToDictCoder` is the default `self.coder` used to create the coder from
a TableSchma, but not necessarily the only possible implementation.
So your `_BigQueryExportResult` class could be:
```python
@dataclass
class _BigQueryExportResult:
coder: beam.coders.Coder
paths: List[str]
```
And
```python
export_result = __BigQueryExportResult(coder=self.coder(table_schema),
paths=[metadata.path for metadata in metadata_list])
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 746790)
Time Spent: 8h 50m (was: 8h 40m)
> ReadFromBigQuery cannot be used with the interactive runner
> -----------------------------------------------------------
>
> Key: BEAM-14112
> URL: https://issues.apache.org/jira/browse/BEAM-14112
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp, runner-py-interactive
> Affects Versions: 2.35.0, 2.36.0, 2.37.0
> Reporter: Chun Yang
> Assignee: Chun Yang
> Priority: P2
> Time Spent: 8h 50m
> Remaining Estimate: 0h
>
> A change in Apache Beam 2.35.0 caused ReadFromBigQuery to no longer work with
> the Python interactive runner.
> The error can be reproduced with the following code:
> {code:python}#!/usr/bin/env python
> """Reproduce pickle issue when using RFBQ in interactive runner."""
> import apache_beam as beam
>
> from apache_beam.runners.interactive.interactive_runner import
> InteractiveRunner
> import apache_beam.runners.interactive.interactive_beam as ib
>
>
>
>
>
> options = beam.options.pipeline_options.PipelineOptions(
>
> project="...",
> temp_location="...",
> )
>
>
>
> pipeline = beam.Pipeline(InteractiveRunner(), options=options)
>
> pcoll = pipeline | beam.io.ReadFromBigQuery(query="SELECT 1")
>
> print(ib.collect(pcoll)){code}
> {code:none}Traceback (most recent call last):
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 214, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 178, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
> File "apache_beam/runners/worker/opcounters.py", line 211, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
> File "apache_beam/runners/worker/opcounters.py", line 250, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
> File "apache_beam/coders/coder_impl.py", line 1425, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1436, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 207, in
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1514, in
> apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 246, in
> apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 441, in
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
> File "apache_beam/coders/coder_impl.py", line 268, in
> apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
> line 802, in <lambda>
> lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
> File "repro.py", line 16, in <module>
> print(ib.collect(pcoll))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py",
> line 270, in run_within_progress_indicator
> return func(*args, **kwargs)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py",
> line 664, in collect
> recording = recording_manager.record([pcoll], max_n=n,
> max_duration=duration)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py",
> line 458, in record
> self.user_pipeline.options).run()
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py",
> line 113, in run
> return self.deduce_fragment().run()
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 573, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py",
> line 195, in run_pipeline
> pipeline_to_execute.run(), pipeline_instrument)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 573, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 131, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 200, in run_pipeline
> pipeline.to_runner_api(default_environment=self._default_environment))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 210, in run_via_runner_api
> return self.run_stages(stage_context, stages)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 396, in run_stages
> runner_execution_context, bundle_context_manager)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 667, in _run_stage
> bundle_manager))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 784, in _run_bundle
> data_input, data_output, input_timers, expected_timer_output)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 1094, in process_bundle
> result_future = self._worker_handler.control_conn.push(process_bundle_req)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
> line 378, in push
> response = self.worker.do_instruction(request)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 581, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 618, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 996, in process_bundle
> element.data)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 221, in process_encoded
> self.output(decoded_value)
> File "apache_beam/runners/worker/operations.py", line 346, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 348, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 215, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 707, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 708, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1200, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1265, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 215, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 707, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 708, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1200, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1265, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 215, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 707, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 708, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1200, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1281, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 214, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 178, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
> File "apache_beam/runners/worker/opcounters.py", line 211, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
> File "apache_beam/runners/worker/opcounters.py", line 250, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
> File "apache_beam/coders/coder_impl.py", line 1425, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1436, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 207, in
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1514, in
> apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 246, in
> apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 441, in
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
> File "apache_beam/coders/coder_impl.py", line 268, in
> apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
> line 802, in <lambda>
> lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects [while running
> 'ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']{code}
> I suspect the error is caused by this change that was first released in
> 2.35.0: https://github.com/apache/beam/pull/15610
--
This message was sent by Atlassian Jira
(v8.20.1#820001)