Ștefan Istrate created BEAM-12657:
-------------------------------------

             Summary: Can't run Beam on separate Flink cluster. Official 
instructions don't work.
                 Key: BEAM-12657
                 URL: https://issues.apache.org/jira/browse/BEAM-12657
             Project: Beam
          Issue Type: Bug
          Components: runner-flink
    Affects Versions: 2.31.0
            Reporter: Ștefan Istrate


I am trying to run Beam on a separate Flink cluster, as described here: 
[https://beam.apache.org/documentation/runners/flink/#executing-a-beam-pipeline-on-a-flink-cluster]

??(1) Start a Flink cluster which exposes the Rest interface (e.g. 
{{localhost:8081}} by default).??

Flink 1.13.1 starts successfully on localhost:8081.

??(2) Start JobService with Flink Rest endpoint: {{docker run --net=host 
apache/beam_flink1.10_job_server:latest --flink-master=localhost:8081}}.??

I am running this instead, which again starts successfully:

{{docker run --net=host apache/beam_flink1.13_job_server:latest 
--flink-master=localhost:8081}}

??(3) Submit the pipeline as above.??

I'm running the WordCount pipeline as:

{{python -m apache_beam.examples.wordcount --input 
/Users/stefan/datastore/input.txt \}}
{{ --output /Users/stefan/datastore/output.txt \}}
{{ --runner=PortableRunner --job_endpoint=localhost:8099}}

After 1 minute the pipeline crashes with a `grpc.FutureTimeoutError`. This is 
the full output:

 

{{WARNING:root:Make sure that locally built Python SDK docker image has Python 
3.8 interpreter.}}
{{INFO:root:Default Python SDK image for environment is 
apache/beam_python3.8_sdk:2.31.0}}
{{INFO:root:No image given, using default Python SDK image}}
{{WARNING:root:Make sure that locally built Python SDK docker image has Python 
3.8 interpreter.}}
{{INFO:root:Default Python SDK image for environment is 
apache/beam_python3.8_sdk:2.31.0}}
{{INFO:root:Python SDK container image set to 
"apache/beam_python3.8_sdk:2.31.0" for Docker environment}}
{{INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
 <function pack_combiners at 0x137d441f0> ====================}}
{{INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
 <function lift_combiners at 0x137d44280> ====================}}
{{INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
 <function sort_stages at 0x137d449d0> ====================}}
{{Traceback (most recent call last):}}
{{ File 
"/usr/local/Cellar/[email protected]/3.8.11/Frameworks/Python.framework/Versions/3.8/lib/python3.8/runpy.py",
 line 194, in _run_module_as_main}}
{{ return _run_code(code, main_globals, None,}}
{{ File 
"/usr/local/Cellar/[email protected]/3.8.11/Frameworks/Python.framework/Versions/3.8/lib/python3.8/runpy.py",
 line 87, in _run_code}}
{{ exec(code, run_globals)}}
{{ File 
"/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/examples/wordcount.py",
 line 94, in <module>}}
{{ run()}}
{{ File 
"/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/examples/wordcount.py",
 line 89, in run}}
{{ output | 'Write' >> WriteToText(known_args.output)}}
{{ File 
"/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/pipeline.py",
 line 585, in __exit__}}
{{ self.result = self.run()}}
{{ File 
"/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/pipeline.py",
 line 564, in run}}
{{ return self.runner.run_pipeline(self, self._options)}}
{{ File 
"/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 438, in run_pipeline}}
{{ job_service_handle = self.create_job_service(options)}}
{{ File 
"/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 317, in create_job_service}}
{{ return self.create_job_service_handle(server.start(), options)}}
{{ File 
"/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/runners/portability/job_server.py",
 line 54, in start}}
{{ grpc.channel_ready_future(channel).result(timeout=self._timeout)}}
{{ File 
"/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/grpc/_utilities.py",
 line 140, in result}}
{{ self._block(timeout)}}
{{ File 
"/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/grpc/_utilities.py",
 line 86, in _block}}
{{ raise grpc.FutureTimeoutError()}}
{{grpc.FutureTimeoutError}}

 

What's going on? Any help would be appreciated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to