damccorm opened a new issue, #31190:
URL: https://github.com/apache/beam/issues/31190
### What happened?
Following the interactive flink example in Dataflow notebooks fails when it
should work. It uses the following (abreviated) steps:
```
from apache_beam.options.pipeline_options import FlinkRunnerOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.runners.interactive.interactive_runner import
InteractiveRunner
from apache_beam.runners.portability.flink_runner import FlinkRunner
import logging
logging.getLogger().setLevel(logging.ERROR)
```
```
import google.auth
project = google.auth.default()[1]
```
```
ib.options.cache_root = 'gs://TEMP_BUCKET/flink'
```
```
interactive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())
# Set up the Apache Beam pipeline options.
options = PipelineOptions()
options.view_as(GoogleCloudOptions).project = project
# Use cloudpickle to alleviate the burden of staging things in the main
module.
options.view_as(SetupOptions).pickle_library = 'cloudpickle'
# As a rule of thumb, the Flink cluster has about vCPU * #TMs = 8 * 40 = 320
slots.
options.view_as(WorkerOptions).machine_type = 'n1-highmem-8'
options.view_as(WorkerOptions).num_workers = 40
```
```
import re
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: re.findall(r'[\w\']+',
line.strip(), re.UNICODE)))
```
```
p_word_count = beam.Pipeline(interactive_flink_runner, options=options)
counts = (
p_word_count
| 'read' >>
ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
| 'count' >> beam.combiners.Count.PerElement())
```
```
ib.show(counts)
```
This correctly creates the cluster, but then fails to connect with an error:
```
RuntimeError: Pipeline
BeamApp-root-0506134545-19c5e6a9_fd14901c-fc18-4427-8ecf-8ab617c84bf4 failed in
state FAILED: java.net.UnknownHostException:
interactive-beam-488403594d174afda694c9aea54ca42d-w-27.us-central1-f.c.dataflow-eou-cep.internal
```
If we run `ib.clusters.describe(p_word_count)`, we see that
`master_url='interactive-beam-e6dceaea0c294871909a73a9a1142c98-w-0.us-central1-f.c.dataflow-eou-cep.internal:38951'`
`interactive-beam-488403594d174afda694c9aea54ca42d-w-27` does exist, but
isn't flink master, so that is likely where the problem is originating
### Issue Priority
Priority: 3 (minor)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [X] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]