[
https://issues.apache.org/jira/browse/BEAM-14112?focusedWorklogId=753740&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-753740
]
ASF GitHub Bot logged work on BEAM-14112:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Apr/22 22:27
Start Date: 06/Apr/22 22:27
Worklog Time Spent: 10m
Work Description: KevinGG opened a new pull request, #17306:
URL: https://github.com/apache/beam/pull/17306
Converted the split_result attribute to a dataclass that is pickle-able
and only stores paths and coder instead of sources or table schemas to
further reduce the size of the transform objects and redundant coder
creations.
Note: it's never guaranteed that a transform should be pickle-able, this
is just to work with the current Interactive Beam implementation.
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI.
Issue Time Tracking
-------------------
Worklog Id: (was: 753740)
Time Spent: 9.5h (was: 9h 20m)
> ReadFromBigQuery cannot be used with the interactive runner
> -----------------------------------------------------------
>
> Key: BEAM-14112
> URL: https://issues.apache.org/jira/browse/BEAM-14112
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp, runner-py-interactive
> Affects Versions: 2.35.0, 2.36.0, 2.37.0
> Reporter: Chun Yang
> Assignee: Ning
> Priority: P2
> Time Spent: 9.5h
> Remaining Estimate: 0h
>
> A change in Apache Beam 2.35.0 caused ReadFromBigQuery to no longer work with
> the Python interactive runner.
> The error can be reproduced with the following code:
> {code:python}#!/usr/bin/env python
> """Reproduce pickle issue when using RFBQ in interactive runner."""
> import apache_beam as beam
>
> from apache_beam.runners.interactive.interactive_runner import
> InteractiveRunner
> import apache_beam.runners.interactive.interactive_beam as ib
>
>
>
>
>
> options = beam.options.pipeline_options.PipelineOptions(
>
> project="...",
> temp_location="...",
> )
>
>
>
> pipeline = beam.Pipeline(InteractiveRunner(), options=options)
>
> pcoll = pipeline | beam.io.ReadFromBigQuery(query="SELECT 1")
>
> print(ib.collect(pcoll)){code}
> {code:none}Traceback (most recent call last):
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 214, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 178, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
> File "apache_beam/runners/worker/opcounters.py", line 211, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
> File "apache_beam/runners/worker/opcounters.py", line 250, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
> File "apache_beam/coders/coder_impl.py", line 1425, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1436, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 207, in
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1514, in
> apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 246, in
> apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 441, in
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
> File "apache_beam/coders/coder_impl.py", line 268, in
> apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
> line 802, in <lambda>
> lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
> File "repro.py", line 16, in <module>
> print(ib.collect(pcoll))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py",
> line 270, in run_within_progress_indicator
> return func(*args, **kwargs)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py",
> line 664, in collect
> recording = recording_manager.record([pcoll], max_n=n,
> max_duration=duration)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py",
> line 458, in record
> self.user_pipeline.options).run()
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py",
> line 113, in run
> return self.deduce_fragment().run()
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 573, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py",
> line 195, in run_pipeline
> pipeline_to_execute.run(), pipeline_instrument)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 573, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 131, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 200, in run_pipeline
> pipeline.to_runner_api(default_environment=self._default_environment))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 210, in run_via_runner_api
> return self.run_stages(stage_context, stages)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 396, in run_stages
> runner_execution_context, bundle_context_manager)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 667, in _run_stage
> bundle_manager))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 784, in _run_bundle
> data_input, data_output, input_timers, expected_timer_output)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> line 1094, in process_bundle
> result_future = self._worker_handler.control_conn.push(process_bundle_req)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
> line 378, in push
> response = self.worker.do_instruction(request)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 581, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 618, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 996, in process_bundle
> element.data)
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 221, in process_encoded
> self.output(decoded_value)
> File "apache_beam/runners/worker/operations.py", line 346, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 348, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 215, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 707, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 708, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1200, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1265, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 215, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 707, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 708, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1200, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1265, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 215, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 707, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 708, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1200, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1281, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1198, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 536, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1361, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 214, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 178, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
> File "apache_beam/runners/worker/opcounters.py", line 211, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
> File "apache_beam/runners/worker/opcounters.py", line 250, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
> File "apache_beam/coders/coder_impl.py", line 1425, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1436, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 987, in
> apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 207, in
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
> File "apache_beam/coders/coder_impl.py", line 1514, in
> apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 246, in
> apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
> File "apache_beam/coders/coder_impl.py", line 441, in
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
> File "apache_beam/coders/coder_impl.py", line 268, in
> apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
> File
> "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py",
> line 802, in <lambda>
> lambda x: dumps(x, protocol), pickle.loads)
> TypeError: can't pickle generator objects [while running
> 'ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']{code}
> I suspect the error is caused by this change that was first released in
> 2.35.0: https://github.com/apache/beam/pull/15610
--
This message was sent by Atlassian Jira
(v8.20.1#820001)