Adam Pearce created BEAM-12864:
----------------------------------
Summary: FlinkRunner (DOCKER) changes HDFS HOST to a name
resolvable to host, rather than Docker
Key: BEAM-12864
URL: https://issues.apache.org/jira/browse/BEAM-12864
Project: Beam
Issue Type: Bug
Components: runner-flink
Affects Versions: 2.31.0
Environment: macOS 11.5.2, Flink 1.13.2, Beam 2.31.0, Docker for Mac
20.10.8
Reporter: Adam Pearce
I'm attempting to create a simple example of reading from an HDFS FileSystem
using the Python SDK. I am able to do this with the direct runner, and am even
able to read the filesystem directly in a simple Python file outside of a Beam
pipeline (but using the Beam IO FileSystem class).
When I create a Beam pipeline and submit it to Flink, it is unable to resolve
the hostname of the Docker Host, because it is set to 'localhost'. I've tried
setting `hdfs_host` in the pipeline options with the typical value of
`host.docker.internal` to reach the Host's network, and even the IP address of
my Docker Host (macOS) (which usually works and is resolvable when testing with
dummy containers). The `host.docker.internal` fails because it is not
resolvable by the Host. A situation is created where the Host _and_ the
container both need to be able to resolve the `hdfs_host` hostname.
When using the IP, this is possible, but I believe that in preparation for the
Flink run, Beam replaces the HDFS Host entry with "localhost" because that is
what the IP resolves to on the Docker Host, which is then not resolvable by the
Docker container.
Users need to be able to explicitly set the HDFS Host parameter with respect to
the Docker environment that the FlinkRunner executes the pipeline in,
regardless of if the Host can resolve that hostname. In some cases, this could
be another Docker container on the Docker network that is resolvable to the
Docker container, but not to the Docker Host. Setting the "hdfs_host" to an IP
Address should not result in replacement with "localhost".
To summarize, running a Beam pipeline with the FlinkRunner (using the Docker
environment), is unable to reach the Docker Host via networking, and therefor
would be unable to connect to an HDFS Filesystem located on the Docker Host.
Code example:
{code:java}
HDFS_HOSTNAME = '192.168.1.11' # Docker Host IP Address (macOS)
HDFS_PORT = 9870
hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME,
hdfs_port=HDFS_PORT, hdfs_user="apearce")
hdfs_filesystem = HadoopFileSystem(hdfs_client_options)
input_file_hdfs = "hdfs://user/apearce/testdata/001.csv"
# This works
# for x in hdfs_filesystem.open(input_file_hdfs).readlines():
# print(x)
p = beam.Pipeline(options=PipelineOptions())
def run(argv=None, save_main_session=True):
config = {
"runner": "FlinkRunner",
"flink_master": "localhost:8081",
"environment_type": "DOCKER",
"save_main_session": True,
"hdfs_host": HDFS_HOSTNAME,
"hdfs_port": HDFS_PORT,
"hdfs_user": "apearce",
}
pipeline_options = PipelineOptions.from_dictionary(config)
with beam.Pipeline(options=pipeline_options) as p:
(
p
| 'ReadFromHDFS' >> beam.io.ReadFromText(input_file_hdfs)
| 'Print' >> beam.Map(print)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run(){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)