Valentyn Tymofieiev created BEAM-8547:
-----------------------------------------
Summary: Portable Wordcount fails with on stadalone Flink cluster
Key: BEAM-8547
URL: https://issues.apache.org/jira/browse/BEAM-8547
Project: Beam
Issue Type: Bug
Components: runner-flink, sdk-py-harness
Reporter: Valentyn Tymofieiev
Repro:
# git checkout origin/release-2.16.0
# ./flink-1.8.2/bin/start-cluster.sh
# gradlew :runners:flink:1.8:job-server:runShadow
-PflinkMasterUrl=localhost:8081
# python -m apache_beam.examples.wordcount --input=/etc/profile
--output=/tmp/py-wordcount-direct --runner=PortableRunner
--experiments=worker_threads=100 --parallelism=1
--shutdown_sources_on_final_watermark --sdk_worker_parallelism=1
--environment_cache_millis=60000 --job_endpoint=localhost:8099
This causes the runner to crash with:
{noformat}
Traceback (most recent call last):
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 158, in _execute
response = task()
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 191, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 343, in do_instruction
request.instruction_id)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 369, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 663, in process_bundle
data.ptransform_id].process_encoded(data.data)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 143, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 255, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 256, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 143, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 593, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 594, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 776, in
apache_beam.runners.common.DoFnRunner.receive
File "apache_beam/runners/common.py", line 782, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 849, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line
421, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 780, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 660, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line
1042, in process
self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
File
"/usr/local/lib/python3.7/site-packages/apache_beam/options/value_provider.py",
line 137, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line
186, in open_writer
return FileBasedSinkWriter(self, writer_path)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line
390, in __init__
self.temp_handle = self.sink.open(temp_shard_path)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/textio.py", line
391, in open
file_handle = super(_TextSink, self).open(temp_path)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/options/value_provider.py",
line 137, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line
129, in open
return FileSystems.create(temp_path, self.mime_type, self.compression_type)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filesystems.py",
line 203, in create
return filesystem.create(path, mime_type, compression_type)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/localfilesystem.py",
line 151, in create
return self._path_open(path, 'wb', mime_type, compression_type)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/localfilesystem.py",
line 134, in _path_open
raw_file = open(path, mode)
RuntimeError: FileNotFoundError: [Errno 2] No such file or directory:
'/tmp/beam-temp-py-wordcount-direct-ea951c18fd1211e9ac84a0c589d778c3/d39e13af-277b-437e-89f2-e00249287e1d.py-wordcount-direct'
[while running 'write/Write/WriteImpl/WriteBundles'] {noformat}
The error happens with flink 1.5 and flink 1.8.
The error does not happen if we run SDK harness in LOOPBACK mode
(--environment_type=LOOPBACK)
The error does not happen if we launch flink jobServer without pointing to a
flink cluster, that is if we remove -PflinkMasterUrl=localhost:8081
Similar error: https://issues.apache.org/jira/browse/BEAM-7859
Note that default parallelism parameters set in portableWordCountBatch are not
compatible with default configuration of standalone Flink cluster, which starts
with only one available slot.
cc: [~ibzib] [~goenka] [~robertwb]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)