[ 
https://issues.apache.org/jira/browse/BEAM-14112?focusedWorklogId=741955&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-741955
 ]

ASF GitHub Bot logged work on BEAM-14112:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Mar/22 00:05
            Start Date: 16/Mar/22 00:05
    Worklog Time Spent: 10m 
      Work Description: chunyang opened a new pull request #17100:
URL: https://github.com/apache/beam/pull/17100


   **Please** add a meaningful description for your change here
   
   Avoid storing a generator in _CustomBigQuerySource so that it can be used 
with the Python interactive runner. See details in 
https://issues.apache.org/jira/browse/BEAM-14112 .
   
   Likely related to #15610
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 741955)
    Remaining Estimate: 0h
            Time Spent: 10m

> ReadFromBigQuery cannot be used with the interactive runner
> -----------------------------------------------------------
>
>                 Key: BEAM-14112
>                 URL: https://issues.apache.org/jira/browse/BEAM-14112
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-gcp, runner-py-interactive
>    Affects Versions: 2.35.0, 2.36.0, 2.37.0
>            Reporter: Chun Yang
>            Priority: P2
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> A change in Apache Beam 2.35.0 caused ReadFromBigQuery to no longer work with 
> the Python interactive runner.
> The error can be reproduced with the following code:
> {code:python}#!/usr/bin/env python
> """Reproduce pickle issue when using RFBQ in interactive runner."""
> import apache_beam as beam                                                    
>     
> from apache_beam.runners.interactive.interactive_runner import 
> InteractiveRunner  
> import apache_beam.runners.interactive.interactive_beam as ib                 
>     
>                                                                               
>     
>                                                                               
>     
> options = beam.options.pipeline_options.PipelineOptions(                      
>     
>     project="...",                                                
>     temp_location="...",                              
> )                                                                             
>     
>                                                                               
>     
> pipeline = beam.Pipeline(InteractiveRunner(), options=options)                
>     
> pcoll = pipeline | beam.io.ReadFromBigQuery(query="SELECT 1")                 
>     
> print(ib.collect(pcoll)){code}
> {code:none}Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1198, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 214, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 178, in 
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 211, in 
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 250, in 
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1425, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1436, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in 
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in 
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 207, in 
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1514, in 
> apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 246, in 
> apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 441, in 
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
>   File "apache_beam/coders/coder_impl.py", line 268, in 
> apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
>  line 802, in <lambda>
>     lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "repro.py", line 16, in <module>
>     print(ib.collect(pcoll))
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py",
>  line 270, in run_within_progress_indicator
>     return func(*args, **kwargs)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py",
>  line 664, in collect
>     recording = recording_manager.record([pcoll], max_n=n, 
> max_duration=duration)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py",
>  line 458, in record
>     self.user_pipeline.options).run()
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py",
>  line 113, in run
>     return self.deduce_fragment().run()
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
>  line 573, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py",
>  line 195, in run_pipeline
>     pipeline_to_execute.run(), pipeline_instrument)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
>  line 573, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
>  line 131, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 200, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 210, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 396, in run_stages
>     runner_execution_context, bundle_context_manager)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 667, in _run_stage
>     bundle_manager))
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 784, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 1094, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>  line 378, in push
>     response = self.worker.do_instruction(request)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 581, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 618, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 996, in process_bundle
>     element.data)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 221, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 346, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 348, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 215, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 707, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 708, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1200, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1265, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1198, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 215, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 707, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 708, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1200, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1265, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1198, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 215, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 707, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 708, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1200, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1281, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1198, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 214, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 178, in 
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 211, in 
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 250, in 
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1425, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1436, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in 
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in 
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 207, in 
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1514, in 
> apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 246, in 
> apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 441, in 
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
>   File "apache_beam/coders/coder_impl.py", line 268, in 
> apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
>  line 802, in <lambda>
>     lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects [while running 
> 'ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']{code}
> I suspect the error is caused by this change that was first released in 
> 2.35.0: https://github.com/apache/beam/pull/15610



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to