[ 
https://issues.apache.org/jira/browse/BEAM-3965?focusedWorklogId=87773&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87773
 ]

ASF GitHub Bot logged work on BEAM-3965:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Apr/18 21:41
            Start Date: 04/Apr/18 21:41
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #4979: [BEAM-3965] HDFS 
Read fixes
URL: https://github.com/apache/beam/pull/4979
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index 28a0c434dc5..3f7e9aba847 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -437,7 +437,8 @@ class FileSystem(BeamPlugin):
   def __init__(self, pipeline_options):
     """
     Args:
-      pipeline_options: Instance of ``PipelineOptions``.
+      pipeline_options: Instance of ``PipelineOptions`` or dict of options and
+        values (like ``RuntimeValueProvider.runtime_options``).
     """
 
   @staticmethod
diff --git a/sdks/python/apache_beam/io/filesystems.py 
b/sdks/python/apache_beam/io/filesystems.py
index 17d8d37a061..5bc195bb0d9 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -24,6 +24,7 @@
 from apache_beam.io.filesystem import BeamIOError
 from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.filesystem import FileSystem
+from apache_beam.options.value_provider import RuntimeValueProvider
 
 # All filesystem implements should be added here as
 # best effort imports. We don't want to force loading
@@ -85,7 +86,11 @@ def get_filesystem(path):
       if len(systems) == 0:
         raise ValueError('Unable to get the Filesystem for path %s' % path)
       elif len(systems) == 1:
-        return systems[0](pipeline_options=FileSystems._pipeline_options)
+        # Pipeline options could come either from the Pipeline itself (using
+        # direct runner), or via RuntimeValueProvider (other runners).
+        options = (FileSystems._pipeline_options or
+                   RuntimeValueProvider.runtime_options)
+        return systems[0](pipeline_options=options)
       else:
         raise ValueError('Found more than one filesystem for path %s' % path)
     except ValueError:
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py 
b/sdks/python/apache_beam/io/hadoopfilesystem.py
index bff243aa555..7382c3c8ade 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -35,6 +35,7 @@
 from apache_beam.io.filesystem import FileSystem
 from apache_beam.io.filesystem import MatchResult
 from apache_beam.options.pipeline_options import HadoopFileSystemOptions
+from apache_beam.options.pipeline_options import PipelineOptions
 
 __all__ = ['HadoopFileSystem']
 
@@ -48,12 +49,11 @@
 _FILE_CHECKSUM_BYTES = 'bytes'
 _FILE_CHECKSUM_LENGTH = 'length'
 # WebHDFS FileStatus property constants.
-_FILE_STATUS_NAME = 'name'
+_FILE_STATUS_LENGTH = 'length'
 _FILE_STATUS_PATH_SUFFIX = 'pathSuffix'
 _FILE_STATUS_TYPE = 'type'
 _FILE_STATUS_TYPE_DIRECTORY = 'DIRECTORY'
 _FILE_STATUS_TYPE_FILE = 'FILE'
-_FILE_STATUS_SIZE = 'size'
 
 
 class HdfsDownloader(filesystemio.Downloader):
@@ -61,7 +61,7 @@ class HdfsDownloader(filesystemio.Downloader):
   def __init__(self, hdfs_client, path):
     self._hdfs_client = hdfs_client
     self._path = path
-    self._size = self._hdfs_client.status(path)[_FILE_STATUS_SIZE]
+    self._size = self._hdfs_client.status(path)[_FILE_STATUS_LENGTH]
 
   @property
   def size(self):
@@ -106,20 +106,26 @@ def __init__(self, pipeline_options):
     """
     super(HadoopFileSystem, self).__init__(pipeline_options)
     logging.getLogger('hdfs.client').setLevel(logging.WARN)
-
     if pipeline_options is None:
       raise ValueError('pipeline_options is not set')
-    hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
-    if hdfs_options.hdfs_host is None:
+    if isinstance(pipeline_options, PipelineOptions):
+      hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
+      hdfs_host = hdfs_options.hdfs_host
+      hdfs_port = hdfs_options.hdfs_port
+      hdfs_user = hdfs_options.hdfs_user
+    else:
+      hdfs_host = pipeline_options.get('hdfs_host')
+      hdfs_port = pipeline_options.get('hdfs_port')
+      hdfs_user = pipeline_options.get('hdfs_user')
+
+    if hdfs_host is None:
       raise ValueError('hdfs_host is not set')
-    if hdfs_options.hdfs_port is None:
+    if hdfs_port is None:
       raise ValueError('hdfs_port is not set')
-    if hdfs_options.hdfs_user is None:
+    if hdfs_user is None:
       raise ValueError('hdfs_user is not set')
     self._hdfs_client = hdfs.InsecureClient(
-        'http://%s:%s' % (
-            hdfs_options.hdfs_host, str(hdfs_options.hdfs_port)),
-        user=hdfs_options.hdfs_user)
+        'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user)
 
   @classmethod
   def scheme(cls):
@@ -188,13 +194,15 @@ def _match(path_pattern, limit):
       """Find all matching paths to the pattern provided."""
       fs = self._hdfs_client.status(path_pattern, strict=False)
       if fs and fs[_FILE_STATUS_TYPE] == _FILE_STATUS_TYPE_FILE:
-        file_statuses = [(fs[_FILE_STATUS_PATH_SUFFIX], fs)][:limit]
+        file_statuses = [(path_pattern, fs)][:limit]
       else:
-        file_statuses = self._hdfs_client.list(path_pattern,
-                                               status=True)[:limit]
-      metadata_list = [FileMetadata(file_status[1][_FILE_STATUS_NAME],
-                                    file_status[1][_FILE_STATUS_SIZE])
-                       for file_status in file_statuses]
+        file_statuses = [(self._join(path_pattern, fs[0]), fs[1])
+                         for fs in self._hdfs_client.list(path_pattern,
+                                                          status=True)[:limit]]
+      metadata_list = [
+          FileMetadata(_HDFS_PREFIX + file_status[0],
+                       file_status[1][_FILE_STATUS_LENGTH])
+          for file_status in file_statuses]
       return MatchResult(path_pattern, metadata_list)
 
     exceptions = {}
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py 
b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
index f6616b38dc4..6302831ebca 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
@@ -67,9 +67,8 @@ def size(self):
   def get_file_status(self):
     """Returns a partial WebHDFS FileStatus object."""
     return {
-        hdfs._FILE_STATUS_NAME: self.stat['path'],
         hdfs._FILE_STATUS_PATH_SUFFIX: posixpath.basename(self.stat['path']),
-        hdfs._FILE_STATUS_SIZE: self.size,
+        hdfs._FILE_STATUS_LENGTH: self.size,
         hdfs._FILE_STATUS_TYPE: self.stat['type'],
     }
 
@@ -237,11 +236,6 @@ def test_url_split(self):
   def test_mkdirs(self):
     url = self.fs.join(self.tmpdir, 't1/t2')
     self.fs.mkdirs(url)
-    match_results = self.fs.match([url])
-    self.assertEqual(1, len(match_results))
-    self.assertEqual(1, len(match_results[0].metadata_list))
-    metadata = match_results[0].metadata_list[0]
-    self.assertEqual(metadata.path, self.fs._parse_url(url))
     self.assertTrue(self.fs.exists(url))
 
   def test_mkdirs_failed(self):
@@ -252,17 +246,17 @@ def test_mkdirs_failed(self):
       self.fs.mkdirs(url)
 
   def test_match_file(self):
-    files = [self.fs.join(self.tmpdir, filename)
-             for filename in ['old_file1', 'old_file2']]
-    expected_files = [self.fs._parse_url(file) for file in files]
-    result = self.fs.match(files)
+    expected_files = [self.fs.join(self.tmpdir, filename)
+                      for filename in ['old_file1', 'old_file2']]
+    match_patterns = expected_files
+    result = self.fs.match(match_patterns)
     returned_files = [f.path
                       for match_result in result
                       for f in match_result.metadata_list]
     self.assertItemsEqual(expected_files, returned_files)
 
   def test_match_file_with_limits(self):
-    expected_files = [self.fs._parse_url(self.fs.join(self.tmpdir, filename))
+    expected_files = [self.fs.join(self.tmpdir, filename)
                       for filename in ['old_file1', 'old_file2']]
     result = self.fs.match([self.tmpdir], [1])[0]
     files = [f.path for f in result.metadata_list]
@@ -293,7 +287,7 @@ def test_match_file_error(self):
       self.assertEqual(files, [self.fs._parse_url(url)])
 
   def test_match_directory(self):
-    expected_files = [self.fs._parse_url(self.fs.join(self.tmpdir, filename))
+    expected_files = [self.fs.join(self.tmpdir, filename)
                       for filename in ['old_file1', 'old_file2']]
 
     result = self.fs.match([self.tmpdir])[0]
@@ -301,7 +295,7 @@ def test_match_directory(self):
     self.assertItemsEqual(files, expected_files)
 
   def test_match_directory_trailing_slash(self):
-    expected_files = [self.fs._parse_url(self.fs.join(self.tmpdir, filename))
+    expected_files = [self.fs.join(self.tmpdir, filename)
                       for filename in ['old_file1', 'old_file2']]
 
     result = self.fs.match([self.tmpdir + '/'])[0]
@@ -517,6 +511,52 @@ def test_delete_error(self):
     self.assertFalse(self.fs.exists(url2))
 
 
+class HadoopFileSystemRuntimeValueProviderTest(unittest.TestCase):
+  """Tests pipeline_options, in the form of a
+  RuntimeValueProvider.runtime_options object."""
+
+  def test_dict_options(self):
+    self._fake_hdfs = FakeHdfs()
+    hdfs.hdfs.InsecureClient = (
+        lambda *args, **kwargs: self._fake_hdfs)
+    pipeline_options = {
+        'hdfs_host': '',
+        'hdfs_port': 0,
+        'hdfs_user': '',
+    }
+
+    self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
+
+  def test_dict_options_missing(self):
+    self._fake_hdfs = FakeHdfs()
+    hdfs.hdfs.InsecureClient = (
+        lambda *args, **kwargs: self._fake_hdfs)
+
+    with self.assertRaisesRegexp(ValueError, r'hdfs_host'):
+      self.fs = hdfs.HadoopFileSystem(
+          pipeline_options={
+              'hdfs_port': 0,
+              'hdfs_user': '',
+          }
+      )
+
+    with self.assertRaisesRegexp(ValueError, r'hdfs_port'):
+      self.fs = hdfs.HadoopFileSystem(
+          pipeline_options={
+              'hdfs_host': '',
+              'hdfs_user': '',
+          }
+      )
+
+    with self.assertRaisesRegexp(ValueError, r'hdfs_user'):
+      self.fs = hdfs.HadoopFileSystem(
+          pipeline_options={
+              'hdfs_host': '',
+              'hdfs_port': 0,
+          }
+      )
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/io/hdfs_integration_test/Dockerfile 
b/sdks/python/apache_beam/io/hdfs_integration_test/Dockerfile
index b5a5724e755..b7955e4c0f9 100644
--- a/sdks/python/apache_beam/io/hdfs_integration_test/Dockerfile
+++ b/sdks/python/apache_beam/io/hdfs_integration_test/Dockerfile
@@ -22,7 +22,9 @@
 
 FROM python:2
 WORKDIR /app
-RUN pip install --no-cache-dir holdup
+ENV HDFSCLI_CONFIG 
/app/sdks/python/apache_beam/io/hdfs_integration_test/hdfscli.cfg
+RUN pip install --no-cache-dir holdup gsutil
+RUN gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt .
 
 # Install Beam and dependencies.
 ADD sdks/python /app/sdks/python
@@ -32,7 +34,11 @@ RUN cd sdks/python && \
     pip install --no-cache-dir $(ls dist/apache-beam-*.tar.gz | tail -n1)[gcp]
 
 # Run wordcount, and write results to HDFS.
-CMD holdup -t 45 http://namenode:50070 && \
+CMD holdup -t 45 http://namenode:50070 http://datanode:50075 && \
+    echo "Waiting for safe mode to end." && \
+    sleep 45 && \
+    hdfscli -v -v -v upload -f kinglear.txt / && \
     python -m apache_beam.examples.wordcount \
-        --output hdfs://tmp/py-wordcount-direct \
+        --input hdfs://kinglear.txt \
+        --output hdfs://py-wordcount-integration \
         --hdfs_host namenode --hdfs_port 50070 --hdfs_user root
diff --git a/sdks/python/apache_beam/io/hdfs_integration_test/hdfscli.cfg 
b/sdks/python/apache_beam/io/hdfs_integration_test/hdfscli.cfg
new file mode 100644
index 00000000000..cb9a5a43cb8
--- /dev/null
+++ b/sdks/python/apache_beam/io/hdfs_integration_test/hdfscli.cfg
@@ -0,0 +1,22 @@
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+[global]
+default.alias = test
+
+[test.alias]
+url = http://namenode:50070
+user = root
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 041ae035e9d..951579f4759 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -39,7 +39,7 @@ rm -rf sdks/python/target/.tox
 pip install --user --upgrade virtualenv tox
 
 # Tox runs unit tests in a virtual environment
-${LOCAL_PATH}/tox -e ALL -c sdks/python/tox.ini -v -v
+${LOCAL_PATH}/tox -e ALL -c sdks/python/tox.ini
 
 # Virtualenv for the rest of the script to run setup & e2e tests
 ${LOCAL_PATH}/virtualenv sdks/python


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 87773)
    Time Spent: 3h 40m  (was: 3.5h)

> HDFS read broken in python
> --------------------------
>
>                 Key: BEAM-3965
>                 URL: https://issues.apache.org/jira/browse/BEAM-3965
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Major
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> When running a command like:
> {noformat}
> python setup.py sdist > /dev/null && python -m apache_beam.examples.wordcount 
> --output gs://.../py-wordcount-output \
>   --hdfs_host ... --hdfs_port 50070 --hdfs_user ehudm --runner DataflowRunner 
> --project ... \
>   --temp_location gs://.../temp-hdfs-int --staging_location 
> gs://.../staging-hdfs-int \
>   --sdk_location dist/apache-beam-2.5.0.dev0.tar.gz --input 
> hdfs://kinglear.txt
> {noformat}
> I get:
> {noformat}
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
>     "__main__", fname, loader, pkg_name)
>   File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
>     exec code in run_globals
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/examples/wordcount.py",
>  line 136, in <module>
>     run()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/examples/wordcount.py",
>  line 90, in run
>     lines = p | 'read' >> ReadFromText(known_args.input)
>   File "apache_beam/io/textio.py", line 522, in __init__
>     skip_header_lines=skip_header_lines)
>   File "apache_beam/io/textio.py", line 117, in __init__
>     validate=validate)
>   File "apache_beam/io/filebasedsource.py", line 119, in __init__
>     self._validate()
>   File "apache_beam/options/value_provider.py", line 124, in _f
>     return fnc(self, *args, **kwargs)
>   File "apache_beam/io/filebasedsource.py", line 176, in _validate
>     match_result = FileSystems.match([pattern], limits=[1])[0]
>   File "apache_beam/io/filesystems.py", line 159, in match
>     return filesystem.match(patterns, limits)
>   File "apache_beam/io/hadoopfilesystem.py", line 221, in match
>     raise BeamIOError('Match operation failed', exceptions)
> apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions 
> {'hdfs://kinglear.txt': KeyError('name',)}
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to