Adam Pearce created BEAM-12864:
----------------------------------

             Summary: FlinkRunner (DOCKER) changes HDFS HOST to a name 
resolvable to host, rather than Docker
                 Key: BEAM-12864
                 URL: https://issues.apache.org/jira/browse/BEAM-12864
             Project: Beam
          Issue Type: Bug
          Components: runner-flink
    Affects Versions: 2.31.0
         Environment: macOS 11.5.2, Flink 1.13.2, Beam 2.31.0, Docker for Mac 
20.10.8
            Reporter: Adam Pearce


I'm attempting to create a simple example of reading from an HDFS FileSystem 
using the Python SDK. I am able to do this with the direct runner, and am even 
able to read the filesystem directly in a simple Python file outside of a Beam 
pipeline (but using the Beam IO FileSystem class).

When I create a Beam pipeline and submit it to Flink, it is unable to resolve 
the hostname of the Docker Host, because it is set to 'localhost'. I've tried 
setting `hdfs_host` in the pipeline options with the typical value of 
`host.docker.internal` to reach the Host's network, and even the IP address of 
my Docker Host (macOS) (which usually works and is resolvable when testing with 
dummy containers). The `host.docker.internal` fails because it is not 
resolvable by the Host. A situation is created where the Host _and_ the 
container both need to be able to resolve the `hdfs_host` hostname.

When using the IP, this is possible, but I believe that in preparation for the 
Flink run, Beam replaces the HDFS Host entry with "localhost" because that is 
what the IP resolves to on the Docker Host, which is then not resolvable by the 
Docker container.

Users need to be able to explicitly set the HDFS Host parameter with respect to 
the Docker environment that the FlinkRunner executes the pipeline in, 
regardless of if the Host can resolve that hostname. In some cases, this could 
be another Docker container on the Docker network that is resolvable to the 
Docker container, but not to the Docker Host. Setting the "hdfs_host" to an IP 
Address should not result in replacement with "localhost".

To summarize, running a Beam pipeline with the FlinkRunner (using the Docker 
environment), is unable to reach the Docker Host via networking, and therefor 
would be unable to connect to an HDFS Filesystem located on the Docker Host.

 

Code example:

 
{code:java}
HDFS_HOSTNAME = '192.168.1.11' # Docker Host IP Address (macOS)
HDFS_PORT = 9870
hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, 
hdfs_port=HDFS_PORT, hdfs_user="apearce")
hdfs_filesystem = HadoopFileSystem(hdfs_client_options)
input_file_hdfs = "hdfs://user/apearce/testdata/001.csv"

# This works
# for x in hdfs_filesystem.open(input_file_hdfs).readlines():
#    print(x)

p = beam.Pipeline(options=PipelineOptions())

def run(argv=None, save_main_session=True):
 config = {
 "runner": "FlinkRunner",
 "flink_master": "localhost:8081",
 "environment_type": "DOCKER",
 "save_main_session": True,
 "hdfs_host": HDFS_HOSTNAME,
 "hdfs_port": HDFS_PORT,
 "hdfs_user": "apearce",
 }

 pipeline_options = PipelineOptions.from_dictionary(config)


 with beam.Pipeline(options=pipeline_options) as p:
   (
     p
     | 'ReadFromHDFS' >> beam.io.ReadFromText(input_file_hdfs)
     | 'Print' >> beam.Map(print)
   )

if __name__ == '__main__':
 logging.getLogger().setLevel(logging.INFO)
 run(){code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to