Chun Yang created BEAM-14112:
--------------------------------

             Summary: ReadFromBigQuery cannot be used with the interactive 
runner
                 Key: BEAM-14112
                 URL: https://issues.apache.org/jira/browse/BEAM-14112
             Project: Beam
          Issue Type: Improvement
          Components: io-py-gcp, runner-py-interactive
    Affects Versions: 2.37.0, 2.36.0, 2.35.0
            Reporter: Chun Yang


A change in Apache Beam 2.35.0 caused ReadFromBigQuery to no longer work with 
the Python interactive runner.

The error can be reproduced with the following code:
{code:python}#!/usr/bin/env python
"""Reproduce pickle issue when using RFBQ in interactive runner."""

import apache_beam as beam                                                      
  
from apache_beam.runners.interactive.interactive_runner import 
InteractiveRunner  
import apache_beam.runners.interactive.interactive_beam as ib                   
  
                                                                                
  
                                                                                
  
options = beam.options.pipeline_options.PipelineOptions(                        
  
    project="...",                                                
    temp_location="...",                              
)                                                                               
  
                                                                                
  
pipeline = beam.Pipeline(InteractiveRunner(), options=options)                  
  
pcoll = pipeline | beam.io.ReadFromBigQuery(query="SELECT 1")                   
  
print(ib.collect(pcoll)){code}

{code:none}Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1198, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 536, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1361, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 214, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 178, in 
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 211, in 
apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 250, in 
apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1425, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1436, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 987, in 
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 987, in 
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 207, in 
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1514, in 
apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
  File "apache_beam/coders/coder_impl.py", line 246, in 
apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
  File "apache_beam/coders/coder_impl.py", line 441, in 
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 268, in 
apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
 line 802, in <lambda>
    lambda x: dumps(x, protocol), pickle.loads)
TypeError: can't pickle generator objects

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "repro.py", line 16, in <module>
    print(ib.collect(pcoll))
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py",
 line 270, in run_within_progress_indicator
    return func(*args, **kwargs)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py",
 line 664, in collect
    recording = recording_manager.record([pcoll], max_n=n, 
max_duration=duration)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py",
 line 458, in record
    self.user_pipeline.options).run()
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py",
 line 113, in run
    return self.deduce_fragment().run()
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
 line 573, in run
    return self.runner.run_pipeline(self, self._options)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py",
 line 195, in run_pipeline
    pipeline_to_execute.run(), pipeline_instrument)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
 line 573, in run
    return self.runner.run_pipeline(self, self._options)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
 line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 200, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 210, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 396, in run_stages
    runner_execution_context, bundle_context_manager)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 667, in _run_stage
    bundle_manager))
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 784, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1094, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
 line 378, in push
    response = self.worker.do_instruction(request)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 581, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 618, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 996, in process_bundle
    element.data)
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 346, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 348, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 215, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 707, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 708, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1200, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1265, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1198, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 536, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1361, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 707, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 708, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1200, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1265, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1198, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 536, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1361, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 707, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 708, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1200, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1281, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1198, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 536, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1361, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 214, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 178, in 
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 211, in 
apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 250, in 
apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1425, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1436, in 
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 987, in 
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 987, in 
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 207, in 
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1514, in 
apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
  File "apache_beam/coders/coder_impl.py", line 246, in 
apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
  File "apache_beam/coders/coder_impl.py", line 441, in 
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 268, in 
apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
  File 
"/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
 line 802, in <lambda>
    lambda x: dumps(x, protocol), pickle.loads)
TypeError: can't pickle generator objects [while running 
'ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']{code}

I suspect the error is caused by this change that was first released in 2.35.0: 
https://github.com/apache/beam/pull/15610



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to