[
https://issues.apache.org/jira/browse/BEAM-12864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Adam Pearce updated BEAM-12864:
-------------------------------
Resolution: Fixed
Status: Resolved (was: Triage Needed)
The source of this issue was the Hadoop HDFS core-site.xml configuration was
set to "localhost" which appended "localhost:9000" to the access URL when Beam
makes the connection.
Reconfiguring HDFS to use the IP of my Docker Host allowed Beam to access HDFS
from within the Pipeline on the FlinkRunner.
> FlinkRunner (DOCKER) changes HDFS HOST to a name resolvable to host, rather
> than Docker
> ---------------------------------------------------------------------------------------
>
> Key: BEAM-12864
> URL: https://issues.apache.org/jira/browse/BEAM-12864
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.31.0
> Environment: macOS 11.5.2, Flink 1.13.2, Beam 2.31.0, Docker for Mac
> 20.10.8
> Reporter: Adam Pearce
> Priority: P2
>
> I'm attempting to create a simple example of reading from an HDFS FileSystem
> using the Python SDK. I am able to do this with the direct runner, and am
> even able to read the filesystem directly in a simple Python file outside of
> a Beam pipeline (but using the Beam IO FileSystem class).
> When I create a Beam pipeline and submit it to Flink, it is unable to resolve
> the hostname of the Docker Host, because it is set to 'localhost'. I've tried
> setting `hdfs_host` in the pipeline options with the typical value of
> `host.docker.internal` to reach the Host's network, and even the IP address
> of my Docker Host (macOS) (which usually works and is resolvable when testing
> with dummy containers). The `host.docker.internal` fails because it is not
> resolvable by the Host. A situation is created where the Host _and_ the
> container both need to be able to resolve the `hdfs_host` hostname.
> When using the IP, this is possible, but I believe that in preparation for
> the Flink run, Beam replaces the HDFS Host entry with "localhost" because
> that is what the IP resolves to on the Docker Host, which is then not
> resolvable by the Docker container.
> Users need to be able to explicitly set the HDFS Host parameter with respect
> to the Docker environment that the FlinkRunner executes the pipeline in,
> regardless of if the Host can resolve that hostname. In some cases, this
> could be another Docker container on the Docker network that is resolvable to
> the Docker container, but not to the Docker Host. Setting the "hdfs_host" to
> an IP Address should not result in replacement with "localhost".
> To summarize, running a Beam pipeline with the FlinkRunner (using the Docker
> environment), is unable to reach the Docker Host via networking, and therefor
> would be unable to connect to an HDFS Filesystem located on the Docker Host.
>
> Code example:
>
> {code:java}
> HDFS_HOSTNAME = '192.168.1.11' # Docker Host IP Address (macOS)
> HDFS_PORT = 9870
> hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME,
> hdfs_port=HDFS_PORT, hdfs_user="apearce")
> hdfs_filesystem = HadoopFileSystem(hdfs_client_options)
> input_file_hdfs = "hdfs://user/apearce/testdata/001.csv"
> # This works
> # for x in hdfs_filesystem.open(input_file_hdfs).readlines():
> # print(x)
> p = beam.Pipeline(options=PipelineOptions())
> def run(argv=None, save_main_session=True):
> config = {
> "runner": "FlinkRunner",
> "flink_master": "localhost:8081",
> "environment_type": "DOCKER",
> "save_main_session": True,
> "hdfs_host": HDFS_HOSTNAME,
> "hdfs_port": HDFS_PORT,
> "hdfs_user": "apearce",
> }
> pipeline_options = PipelineOptions.from_dictionary(config)
> with beam.Pipeline(options=pipeline_options) as p:
> (
> p
> | 'ReadFromHDFS' >> beam.io.ReadFromText(input_file_hdfs)
> | 'Print' >> beam.Map(print)
> )
> if __name__ == '__main__':
> logging.getLogger().setLevel(logging.INFO)
> run(){code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)