[
https://issues.apache.org/jira/browse/BEAM-11037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Stauffer updated BEAM-11037:
------------------------------------
Description:
I have a Beam workflow that runs fine on Dataflow. However, when I run it on my
local direct runner (Windows 10, Python SDK), I run into some strange behaviour
if I have a negative number in my PCollection (see below for an example).
Minimal code example to reproduce the exception:
{code:java}
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline
import apache_beam
if __name__ == '__main__':
pipeline_options = PipelineOptions()
with Pipeline(options=pipeline_options) as p:
lines = (
p
| apache_beam.Create([
{"Test": -1}
]))
lines | apache_beam.io.WriteToText('./lines')
{code}
Exception:
{code:java}
File "apache_beam\coders\coder_impl.py", line 222, in
apache_beam.coders.coder_impl.StreamCoderImpl.decode
File "apache_beam\coders\coder_impl.py", line 224, in
apache_beam.coders.coder_impl.StreamCoderImpl.decode
File "apache_beam\coders\coder_impl.py", line 470, in
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 450, in
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
File "apache_beam\coders\stream.pyx", line 193, in
apache_beam.coders.stream.InputStream.read_var_int64
RuntimeError: VarLong too long. [while running 'Create/Map(decode)']
{code}
Any help is highly appreciated.
was:
I have a Beam workflow that runs fine on Dataflow. However, when I run it on my
local direct runner (Windows 10, Python SDK), I run into memory (or some other
issues). Unless I massively reduce the size of the input CSV-files to a few KB.
I'm not quite sure if it's just a memory issue, as the size of the worker node
is fixed to 100 MB (in contrast to the Java SDK, it seems that this limit is
hard coded so far, see also
[here|https://stackoverflow.com/questions/58099163/dataflow-sideinputs-worker-cache-size-in-sdk-2-x]).
{code:java}
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
{code}
Or if the there is an issue in the implementation of InputStream (see also
below):
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/slow_stream.py#L178]
Any help is highly appreciated.
Exception:
{code:java}
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running
((WriteInterviewQuestions/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_WriteInterviewQuestions/Write/WriteImpl/WriteBundles_51))+(ref_PCollection_PCollection_33/Write)
Traceback (most recent call last):
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py",
line 555, in __exit__
self.result = self.run()
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\pipeline.py",
line 534, in run
return self.runner.run_pipeline(self, self._options)
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\direct\direct_runner.py",
line 119, in run_pipeline
return runner.run_pipeline(pipeline, options)
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
line 176, in run_pipeline
pipeline.to_runner_api(default_environment=self._default_environment))
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
line 186, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
line 344, in run_stages
bundle_context_manager,
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
line 523, in _run_stage
bundle_manager)
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
line 561, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
line 945, in process_bundle
timer_inputs)):
File
"c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py",
line 598, in result_iterator
yield fs.pop().result()
File
"c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py",
line 435, in result
return self.__get_result()
File
"c:\users\c804249\appdata\local\programs\python\python37\lib\concurrent\futures\_base.py",
line 384, in __get_result
raise self._exception
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\utils\thread_pool_executor.py",
line 44, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
line 941, in execute
dry_run)
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
line 841, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py",
line 353, in push
response = self.worker.do_instruction(request)
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
line 483, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
line 518, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
line 983, in process_bundle
element.data)
File
"C:\Users\C804249\Projects\askia-processing\.tox\dev\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
line 218, in process_encoded
input_stream, True)
File "apache_beam\coders\coder_impl.py", line 1246, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 1265, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 858, in
apache_beam.coders.coder_impl.AbstractComponentCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 1043, in
apache_beam.coders.coder_impl.SequenceCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 1359, in
apache_beam.coders.coder_impl.LengthPrefixCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 224, in
apache_beam.coders.coder_impl.StreamCoderImpl.decode
File "apache_beam\coders\coder_impl.py", line 470, in
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
File "apache_beam\coders\coder_impl.py", line 450, in
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
File "apache_beam\coders\stream.pyx", line 193, in
apache_beam.coders.stream.InputStream.read_var_int64
RuntimeError: VarLong too long.
{code}
> Python DirectRunner InputStream Issues (RuntimeError: VarLong too long)
> -----------------------------------------------------------------------
>
> Key: BEAM-11037
> URL: https://issues.apache.org/jira/browse/BEAM-11037
> Project: Beam
> Issue Type: Bug
> Components: io-py-avro, io-py-files, runner-direct
> Affects Versions: 2.24.0
> Environment: Windows 10 64bit, Python 3.7.9
> Reporter: Michael Stauffer
> Priority: P0
>
> I have a Beam workflow that runs fine on Dataflow. However, when I run it on
> my local direct runner (Windows 10, Python SDK), I run into some strange
> behaviour if I have a negative number in my PCollection (see below for an
> example).
> Minimal code example to reproduce the exception:
> {code:java}
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.pipeline import Pipeline
> import apache_beam
> if __name__ == '__main__':
> pipeline_options = PipelineOptions()
> with Pipeline(options=pipeline_options) as p:
> lines = (
> p
> | apache_beam.Create([
> {"Test": -1}
> ]))
> lines | apache_beam.io.WriteToText('./lines')
> {code}
> Exception:
> {code:java}
> File "apache_beam\coders\coder_impl.py", line 222, in
> apache_beam.coders.coder_impl.StreamCoderImpl.decode
> File "apache_beam\coders\coder_impl.py", line 224, in
> apache_beam.coders.coder_impl.StreamCoderImpl.decode
> File "apache_beam\coders\coder_impl.py", line 470, in
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
> File "apache_beam\coders\coder_impl.py", line 450, in
> apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.decode_from_stream
> File "apache_beam\coders\stream.pyx", line 193, in
> apache_beam.coders.stream.InputStream.read_var_int64
> RuntimeError: VarLong too long. [while running 'Create/Map(decode)']
> {code}
> Any help is highly appreciated.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)