damccorm opened a new issue, #31190:
URL: https://github.com/apache/beam/issues/31190

   ### What happened?
   
   Following the interactive flink example in Dataflow notebooks fails when it 
should work. It uses the following (abreviated) steps:
   
   ```
   from apache_beam.options.pipeline_options import FlinkRunnerOptions
   from apache_beam.options.pipeline_options import GoogleCloudOptions
   from apache_beam.options.pipeline_options import PipelineOptions
   from apache_beam.options.pipeline_options import PortableOptions
   from apache_beam.options.pipeline_options import SetupOptions
   from apache_beam.options.pipeline_options import WorkerOptions
   from apache_beam.runners.interactive.interactive_runner import 
InteractiveRunner
   from apache_beam.runners.portability.flink_runner import FlinkRunner
   
   import logging
   logging.getLogger().setLevel(logging.ERROR)
   ```
   
   ```
   import google.auth
   project = google.auth.default()[1]
   ```
   
   ```
   ib.options.cache_root = 'gs://TEMP_BUCKET/flink'
   ```
   
   ```
   interactive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())
   
   # Set up the Apache Beam pipeline options.
   options = PipelineOptions()
   options.view_as(GoogleCloudOptions).project = project
   # Use cloudpickle to alleviate the burden of staging things in the main 
module.
   options.view_as(SetupOptions).pickle_library = 'cloudpickle'
   # As a rule of thumb, the Flink cluster has about vCPU * #TMs = 8 * 40 = 320 
slots.
   options.view_as(WorkerOptions).machine_type = 'n1-highmem-8'
   options.view_as(WorkerOptions).num_workers = 40
   ```
   
   ```
   import re
   
   
   class ReadWordsFromText(beam.PTransform):
       def __init__(self, file_pattern):
           self._file_pattern = file_pattern
       
       def expand(self, pcoll):
           return (pcoll.pipeline
                   | beam.io.ReadFromText(self._file_pattern)
                   | beam.FlatMap(lambda line: re.findall(r'[\w\']+', 
line.strip(), re.UNICODE)))
   ```
   
   ```
   p_word_count = beam.Pipeline(interactive_flink_runner, options=options)
   
   counts = (
       p_word_count
       | 'read' >> 
ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
       | 'count' >> beam.combiners.Count.PerElement())
   ```
   
   ```
   ib.show(counts)
   ```
   
   This correctly creates the cluster, but then fails to connect with an error:
   
   ```
   RuntimeError: Pipeline 
BeamApp-root-0506134545-19c5e6a9_fd14901c-fc18-4427-8ecf-8ab617c84bf4 failed in 
state FAILED: java.net.UnknownHostException: 
interactive-beam-488403594d174afda694c9aea54ca42d-w-27.us-central1-f.c.dataflow-eou-cep.internal
   ```
   
   If we run `ib.clusters.describe(p_word_count)`, we see that 
`master_url='interactive-beam-e6dceaea0c294871909a73a9a1142c98-w-0.us-central1-f.c.dataflow-eou-cep.internal:38951'`
   
   `interactive-beam-488403594d174afda694c9aea54ca42d-w-27` does exist, but 
isn't flink master, so that is likely where the problem is originating
   
   ### Issue Priority
   
   Priority: 3 (minor)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to