[ 
https://issues.apache.org/jira/browse/BEAM-14112?focusedWorklogId=746184&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-746184
 ]

ASF GitHub Bot logged work on BEAM-14112:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Mar/22 21:47
            Start Date: 22/Mar/22 21:47
    Worklog Time Spent: 10m 
      Work Description: chunyang commented on pull request #17153:
URL: https://github.com/apache/beam/pull/17153#issuecomment-1075676149


   Python integration tests passed in postcommit AFAICT, but there was a 
failure in `:sdks:java:io:google-cloud-platform:expansion-service:shadowJar`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 746184)
    Time Spent: 7h 10m  (was: 7h)

> ReadFromBigQuery cannot be used with the interactive runner
> -----------------------------------------------------------
>
>                 Key: BEAM-14112
>                 URL: https://issues.apache.org/jira/browse/BEAM-14112
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-gcp, runner-py-interactive
>    Affects Versions: 2.35.0, 2.36.0, 2.37.0
>            Reporter: Chun Yang
>            Assignee: Chun Yang
>            Priority: P2
>             Fix For: 2.38.0
>
>          Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> A change in Apache Beam 2.35.0 caused ReadFromBigQuery to no longer work with 
> the Python interactive runner.
> The error can be reproduced with the following code:
> {code:python}#!/usr/bin/env python
> """Reproduce pickle issue when using RFBQ in interactive runner."""
> import apache_beam as beam                                                    
>     
> from apache_beam.runners.interactive.interactive_runner import 
> InteractiveRunner  
> import apache_beam.runners.interactive.interactive_beam as ib                 
>     
>                                                                               
>     
>                                                                               
>     
> options = beam.options.pipeline_options.PipelineOptions(                      
>     
>     project="...",                                                
>     temp_location="...",                              
> )                                                                             
>     
>                                                                               
>     
> pipeline = beam.Pipeline(InteractiveRunner(), options=options)                
>     
> pcoll = pipeline | beam.io.ReadFromBigQuery(query="SELECT 1")                 
>     
> print(ib.collect(pcoll)){code}
> {code:none}Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1198, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 214, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 178, in 
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 211, in 
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 250, in 
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1425, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1436, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in 
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in 
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 207, in 
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1514, in 
> apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 246, in 
> apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 441, in 
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
>   File "apache_beam/coders/coder_impl.py", line 268, in 
> apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
>  line 802, in <lambda>
>     lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "repro.py", line 16, in <module>
>     print(ib.collect(pcoll))
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py",
>  line 270, in run_within_progress_indicator
>     return func(*args, **kwargs)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py",
>  line 664, in collect
>     recording = recording_manager.record([pcoll], max_n=n, 
> max_duration=duration)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py",
>  line 458, in record
>     self.user_pipeline.options).run()
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py",
>  line 113, in run
>     return self.deduce_fragment().run()
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
>  line 573, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py",
>  line 195, in run_pipeline
>     pipeline_to_execute.run(), pipeline_instrument)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
>  line 573, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
>  line 131, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 200, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 210, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 396, in run_stages
>     runner_execution_context, bundle_context_manager)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 667, in _run_stage
>     bundle_manager))
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 784, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 1094, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>  line 378, in push
>     response = self.worker.do_instruction(request)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 581, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 618, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 996, in process_bundle
>     element.data)
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 221, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 346, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 348, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 215, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 707, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 708, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1200, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1265, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1198, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 215, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 707, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 708, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1200, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1265, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1198, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 215, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 707, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 708, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1200, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1281, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1198, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 536, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1361, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 214, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 178, in 
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 211, in 
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 250, in 
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1425, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1436, in 
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in 
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 987, in 
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 207, in 
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1514, in 
> apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 246, in 
> apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 441, in 
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
>   File "apache_beam/coders/coder_impl.py", line 268, in 
> apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
>   File 
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
>  line 802, in <lambda>
>     lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects [while running 
> 'ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']{code}
> I suspect the error is caused by this change that was first released in 
> 2.35.0: https://github.com/apache/beam/pull/15610



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to