[ 
https://issues.apache.org/jira/browse/BEAM-3099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299437#comment-16299437
 ] 

ASF GitHub Bot commented on BEAM-3099:
--------------------------------------

chamikaramj closed pull request #4233: [BEAM-3099] Initial implementation of 
HadoopFileSystem.
URL: https://github.com/apache/beam/pull/4233
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index 69049ae67a4..0efdb0ef751 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -425,7 +425,7 @@ class FileSystem(BeamPlugin):
   """A class that defines the functions that can be performed on a filesystem.
 
   All methods are abstract and they are for file system providers to
-  implement. Clients should use the FileSystemUtil class to interact with
+  implement. Clients should use the FileSystems class to interact with
   the correct file system based on the provided file pattern scheme.
   """
   __metaclass__ = abc.ABCMeta
@@ -522,7 +522,7 @@ def open(self, path, mime_type='application/octet-stream',
     """Returns a read channel for the given file path.
 
     Args:
-      path: string path of the file object to be written to the system
+      path: string path of the file object to be read
       mime_type: MIME type to specify the type of content in the file object
       compression_type: Type of compression to be used for this object
 
diff --git a/sdks/python/apache_beam/io/filesystems.py 
b/sdks/python/apache_beam/io/filesystems.py
index f9ce5539653..0c82a7e25dc 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -24,6 +24,7 @@
 from apache_beam.io.filesystem import FileSystem
 # All filesystem implements should be added here
 # pylint: disable=wrong-import-position, unused-import
+from apache_beam.io.hadoopfilesystem import HadoopFileSystem
 from apache_beam.io.localfilesystem import LocalFileSystem
 
 try:
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py 
b/sdks/python/apache_beam/io/hadoopfilesystem.py
new file mode 100644
index 00000000000..9aad7f09dd1
--- /dev/null
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -0,0 +1,285 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""":class:`~apache_beam.io.filesystem.FileSystem` implementation for accessing
+Hadoop Distributed File System files."""
+
+from __future__ import absolute_import
+
+import logging
+import posixpath
+import re
+
+from hdfs3 import HDFileSystem
+
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressedFile
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystem import FileMetadata
+from apache_beam.io.filesystem import FileSystem
+from apache_beam.io.filesystem import MatchResult
+
+__all__ = ['HadoopFileSystem']
+
+_HDFS_PREFIX = 'hdfs:/'
+_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'(/.*)')
+_COPY_BUFFER_SIZE = 2 ** 16
+
+
+# TODO(udim): Add @retry.with_exponential_backoff to some functions, like in
+# gcsio.py.
+
+
+class HadoopFileSystem(FileSystem):
+  """``FileSystem`` implementation that supports HDFS.
+
+  URL arguments to methods expect strings starting with ``hdfs://``.
+
+  Uses client library :class:`hdfs3.core.HDFileSystem`.
+  """
+
+  def __init__(self):
+    """Initializes a connection to HDFS.
+
+    Connection configuration is done using :doc:`hdfs`.
+    """
+    super(HadoopFileSystem, self).__init__()
+    self._hdfs_client = HDFileSystem()
+
+  @classmethod
+  def scheme(cls):
+    return 'hdfs'
+
+  @staticmethod
+  def _parse_url(url):
+    """Verifies that url begins with hdfs:// prefix, strips it and adds a
+    leading /.
+
+    Raises:
+      ValueError if url doesn't begin with hdfs://.
+
+    Args:
+      url: A URL in the form hdfs://path/...
+
+    Returns:
+      For an input of 'hdfs://path/...', will return '/path/...'.
+    """
+    m = _URL_RE.match(url)
+    if m is None:
+      raise ValueError('Could not parse url: %s' % url)
+    return m.group(1)
+
+  def join(self, base_url, *paths):
+    """Join two or more pathname components.
+
+    Args:
+      base_url: string path of the first component of the path.
+        Must start with hdfs://.
+      paths: path components to be added
+
+    Returns:
+      Full url after combining all the passed components.
+    """
+    basepath = self._parse_url(base_url)
+    return _HDFS_PREFIX + self._join(basepath, *paths)
+
+  def _join(self, basepath, *paths):
+    return posixpath.join(basepath, *paths)
+
+  def split(self, url):
+    rel_path = self._parse_url(url)
+    head, tail = posixpath.split(rel_path)
+    return _HDFS_PREFIX + head, tail
+
+  def mkdirs(self, url):
+    path = self._parse_url(url)
+    if self._exists(path):
+      raise IOError('Path already exists: %s' % path)
+    return self._mkdirs(path)
+
+  def _mkdirs(self, path):
+    self._hdfs_client.makedirs(path)
+
+  def match(self, url_patterns, limits=None):
+    if limits is None:
+      limits = [None] * len(url_patterns)
+
+    if len(url_patterns) != len(limits):
+      raise BeamIOError(
+          'Patterns and limits should be equal in length: %d != %d' % (
+              len(url_patterns), len(limits)))
+
+    # TODO(udim): Update client to allow batched results.
+    def _match(path_pattern, limit):
+      """Find all matching paths to the pattern provided."""
+      file_infos = self._hdfs_client.ls(path_pattern, detail=True)[:limit]
+      metadata_list = [FileMetadata(file_info['name'], file_info['size'])
+                       for file_info in file_infos]
+      return MatchResult(path_pattern, metadata_list)
+
+    exceptions = {}
+    result = []
+    for url_pattern, limit in zip(url_patterns, limits):
+      try:
+        path_pattern = self._parse_url(url_pattern)
+        result.append(_match(path_pattern, limit))
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[url_pattern] = e
+
+    if exceptions:
+      raise BeamIOError('Match operation failed', exceptions)
+    return result
+
+  def _open_hdfs(self, path, mode, mime_type, compression_type):
+    if mime_type != 'application/octet-stream':
+      logging.warning('Mime types are not supported. Got non-default 
mime_type:'
+                      ' %s', mime_type)
+    if compression_type == CompressionTypes.AUTO:
+      compression_type = CompressionTypes.detect_compression_type(path)
+    res = self._hdfs_client.open(path, mode)
+    if compression_type != CompressionTypes.UNCOMPRESSED:
+      res = CompressedFile(res)
+    return res
+
+  def create(self, url, mime_type='application/octet-stream',
+             compression_type=CompressionTypes.AUTO):
+    """
+    Returns:
+      *hdfs3.core.HDFile*: An Python File-like object.
+    """
+    path = self._parse_url(url)
+    return self._create(path, mime_type, compression_type)
+
+  def _create(self, path, mime_type='application/octet-stream',
+              compression_type=CompressionTypes.AUTO):
+    return self._open_hdfs(path, 'wb', mime_type, compression_type)
+
+  def open(self, url, mime_type='application/octet-stream',
+           compression_type=CompressionTypes.AUTO):
+    """
+    Returns:
+      *hdfs3.core.HDFile*: An Python File-like object.
+    """
+    path = self._parse_url(url)
+    return self._open(path, mime_type, compression_type)
+
+  def _open(self, path, mime_type='application/octet-stream',
+            compression_type=CompressionTypes.AUTO):
+    return self._open_hdfs(path, 'rb', mime_type, compression_type)
+
+  def copy(self, source_file_names, destination_file_names):
+    """
+    Will overwrite files and directories in destination_file_names.
+
+    Raises ``BeamIOError`` if any error occurred.
+
+    Args:
+      source_file_names: iterable of URLs.
+      destination_file_names: iterable of URLs.
+    """
+    if len(source_file_names) != len(destination_file_names):
+      raise BeamIOError(
+          'source_file_names and destination_file_names should '
+          'be equal in length: %d != %d' % (
+              len(source_file_names), len(destination_file_names)))
+
+    def _copy_file(source, destination):
+      with self._open(source) as f1:
+        with self._create(destination) as f2:
+          while True:
+            buf = f1.read(_COPY_BUFFER_SIZE)
+            if not buf:
+              break
+            f2.write(buf)
+
+    def _copy_path(source, destination):
+      """Recursively copy the file tree from the source to the destination."""
+      if not self._hdfs_client.isdir(source):
+        _copy_file(source, destination)
+        return
+
+      for path, dirs, files in self._hdfs_client.walk(source):
+        for dir in dirs:
+          new_dir = self._join(destination, dir)
+          if not self._exists(new_dir):
+            self._mkdirs(new_dir)
+
+        rel_path = posixpath.relpath(path, source)
+        if rel_path == '.':
+          rel_path = ''
+        for file in files:
+          _copy_file(self._join(path, file),
+                     self._join(destination, rel_path, file))
+
+    exceptions = {}
+    for source, destination in zip(source_file_names, destination_file_names):
+      try:
+        rel_source = self._parse_url(source)
+        rel_destination = self._parse_url(destination)
+        _copy_path(rel_source, rel_destination)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[(source, destination)] = e
+
+    if exceptions:
+      raise BeamIOError('Copy operation failed', exceptions)
+
+  def rename(self, source_file_names, destination_file_names):
+    exceptions = {}
+    for source, destination in zip(source_file_names, destination_file_names):
+      try:
+        rel_source = self._parse_url(source)
+        rel_destination = self._parse_url(destination)
+        if not self._hdfs_client.mv(rel_source, rel_destination):
+          raise BeamIOError(
+              'libhdfs error in renaming %s to %s' % (source, destination))
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[(source, destination)] = e
+
+    if exceptions:
+      raise BeamIOError('Rename operation failed', exceptions)
+
+  def exists(self, url):
+    """Checks existence of url in HDFS.
+
+    Args:
+      url: String in the form hdfs://...
+
+    Returns:
+      True if url exists as a file or directory in HDFS.
+    """
+    path = self._parse_url(url)
+    return self._exists(path)
+
+  def _exists(self, path):
+    """Returns True if path exists as a file or directory in HDFS.
+
+    Args:
+      path: String in the form /...
+    """
+    return self._hdfs_client.exists(path)
+
+  def delete(self, urls):
+    exceptions = {}
+    for url in urls:
+      try:
+        path = self._parse_url(url)
+        self._hdfs_client.rm(path, recursive=True)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[url] = e
+
+    if exceptions:
+      raise BeamIOError("Delete operation failed", exceptions)
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py 
b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
new file mode 100644
index 00000000000..af5bca86344
--- /dev/null
+++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
@@ -0,0 +1,472 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for :class:`HadoopFileSystem`."""
+
+from __future__ import absolute_import
+
+import posixpath
+import StringIO
+import unittest
+
+from apache_beam.io import hadoopfilesystem
+from apache_beam.io.filesystem import BeamIOError
+
+
+class FakeFile(StringIO.StringIO):
+  """File object for FakeHdfs"""
+
+  def __init__(self, path, mode):
+    StringIO.StringIO.__init__(self)
+    self.stat = {
+        'path': path,
+        'mode': mode,
+    }
+    self.saved_data = None
+
+  def __eq__(self, other):
+    return self.stat == other.stat and self.getvalue() == self.getvalue()
+
+  def close(self):
+    self.saved_data = self.getvalue()
+    StringIO.StringIO.close(self)
+
+  def __enter__(self):
+    return self
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self.close()
+
+  @property
+  def size(self):
+    if self.closed:
+      if self.saved_data is None:
+        return 0
+      return len(self.saved_data)
+    return len(self.getvalue())
+
+
+class FakeHdfsError(Exception):
+  """Generic error for FakeHdfs methods."""
+
+
+class FakeHdfs(object):
+  """Fake implementation of hdfs3.HadoopFileSystem."""
+
+  def __init__(self):
+    self.files = {}
+
+  def open(self, path, mode='rb'):
+    if mode == 'rb' and not self.exists(path):
+      raise FakeHdfsError('Path not found: %s' % path)
+
+    if mode in ['rb', 'wb']:
+      new_file = FakeFile(path, mode)
+      # Required to support read and write operations with CompressedFile.
+      new_file.mode = 'rw'
+
+      if mode == 'rb':
+        old_file = self.files.get(path, None)
+        if old_file is not None:
+          if old_file.stat['mode'] == 'dir':
+            raise FakeHdfsError('Cannot open a directory: %s' % path)
+          if old_file.saved_data:
+            old_file = self.files[path]
+            new_file.write(old_file.saved_data)
+            new_file.seek(0)
+
+      self.files[path] = new_file
+      return new_file
+    else:
+      raise FakeHdfsError('Unknown mode: %s' % mode)
+
+  def ls(self, path, detail=False):
+    result = []
+    for file in self.files.itervalues():
+      if file.stat['path'].startswith(path):
+        result.append({
+            'name': file.stat['path'],
+            'size': file.size,
+        })
+    return result
+
+  def makedirs(self, path):
+    self.files[path] = FakeFile(path, 'dir')
+
+  def exists(self, path):
+    return path in self.files
+
+  def rm(self, path, recursive=True):
+    if not recursive:
+      raise FakeHdfsError('Non-recursive mode not implemented')
+
+    if not self.exists(path):
+      raise FakeHdfsError('Path not found: %s' % path)
+
+    for filepath in self.files.keys():  # pylint: 
disable=consider-iterating-dictionary
+      if filepath.startswith(path):
+        del self.files[filepath]
+
+  def isdir(self, path):
+    if not self.exists(path):
+      raise FakeHdfsError('Path not found: %s' % path)
+
+    return self.files[path].stat['mode'] == 'dir'
+
+  def walk(self, path):
+    paths = [path]
+    while paths:
+      path = paths.pop()
+      files = []
+      dirs = []
+      for full_path in self.files:
+        if not full_path.startswith(path):
+          continue
+        short_path = posixpath.relpath(full_path, path)
+        if '/' not in short_path:
+          if self.isdir(full_path):
+            if short_path != '.':
+              dirs.append(short_path)
+          else:
+            files.append(short_path)
+
+      yield path, dirs, files
+      paths = [posixpath.join(path, dir) for dir in dirs]
+
+  def mv(self, path1, path2):
+    if not self.exists(path1):
+      raise FakeHdfsError('Path1 not found: %s' % path1)
+
+    for fullpath in self.files.keys():  # pylint: 
disable=consider-iterating-dictionary
+      if fullpath == path1 or fullpath.startswith(path1 + '/'):
+        f = self.files.pop(fullpath)
+        newpath = path2 + fullpath[len(path1):]
+        f.stat['path'] = newpath
+        self.files[newpath] = f
+
+    return True
+
+
+class HadoopFileSystemTest(unittest.TestCase):
+
+  def setUp(self):
+    self._fake_hdfs = FakeHdfs()
+    hadoopfilesystem.HDFileSystem = lambda *args, **kwargs: self._fake_hdfs
+    self.fs = hadoopfilesystem.HadoopFileSystem()
+    self.tmpdir = 'hdfs://test_dir'
+
+    for filename in ['old_file1', 'old_file2']:
+      url = self.fs.join(self.tmpdir, filename)
+      self.fs.create(url).close()
+
+  def test_scheme(self):
+    self.assertEqual(self.fs.scheme(), 'hdfs')
+    self.assertEqual(hadoopfilesystem.HadoopFileSystem.scheme(), 'hdfs')
+
+  def test_url_join(self):
+    self.assertEqual('hdfs://tmp/path/to/file',
+                     self.fs.join('hdfs://tmp/path', 'to', 'file'))
+    self.assertEqual('hdfs://tmp/path/to/file',
+                     self.fs.join('hdfs://tmp/path', 'to/file'))
+    self.assertEqual('hdfs://tmp/path/',
+                     self.fs.join('hdfs://tmp/path/', ''))
+    self.assertEqual('hdfs://bar',
+                     self.fs.join('hdfs://foo', '/bar'))
+    with self.assertRaises(ValueError):
+      self.fs.join('/no/scheme', 'file')
+
+  def test_url_split(self):
+    self.assertEqual(('hdfs://tmp/path/to', 'file'),
+                     self.fs.split('hdfs://tmp/path/to/file'))
+    self.assertEqual(('hdfs://', 'tmp'), self.fs.split('hdfs://tmp'))
+    self.assertEqual(('hdfs://tmp', ''), self.fs.split('hdfs://tmp/'))
+    with self.assertRaisesRegexp(ValueError, r'parse'):
+      self.fs.split('tmp')
+
+  def test_mkdirs(self):
+    url = self.fs.join(self.tmpdir, 't1/t2')
+    self.fs.mkdirs(url)
+    match_results = self.fs.match([url])
+    self.assertEqual(1, len(match_results))
+    self.assertEqual(1, len(match_results[0].metadata_list))
+    metadata = match_results[0].metadata_list[0]
+    self.assertEqual(metadata.path, self.fs._parse_url(url))
+    self.assertTrue(self.fs.exists(url))
+
+  def test_mkdirs_failed(self):
+    url = self.fs.join(self.tmpdir, 't1/t2')
+    self.fs.mkdirs(url)
+
+    with self.assertRaises(IOError):
+      self.fs.mkdirs(url)
+
+  def test_match_file(self):
+    files = [self.fs.join(self.tmpdir, filename)
+             for filename in ['old_file1', 'old_file2']]
+    expected_files = [self.fs._parse_url(file) for file in files]
+    result = self.fs.match(files)
+    returned_files = [f.path
+                      for match_result in result
+                      for f in match_result.metadata_list]
+    self.assertItemsEqual(expected_files, returned_files)
+
+  def test_match_file_with_limits(self):
+    expected_files = [self.fs._parse_url(self.fs.join(self.tmpdir, filename))
+                      for filename in ['old_file1', 'old_file2']]
+    result = self.fs.match([self.tmpdir], [1])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEquals(len(files), 1)
+    self.assertIn(files[0], expected_files)
+
+  def test_match_file_with_zero_limit(self):
+    result = self.fs.match([self.tmpdir], [0])[0]
+    self.assertEquals(len(result.metadata_list), 0)
+
+  def test_match_file_with_bad_limit(self):
+    with self.assertRaisesRegexp(BeamIOError, r'TypeError'):
+      _ = self.fs.match([self.tmpdir], ['a'])[0]
+
+  def test_match_file_empty(self):
+    url = self.fs.join(self.tmpdir, 'nonexistent_file')
+    result = self.fs.match([url])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [])
+
+  def test_match_file_error(self):
+    url = self.fs.join(self.tmpdir, 'old_file1')
+    bad_url = 'bad_url'
+    with self.assertRaisesRegexp(BeamIOError,
+                                 r'^Match operation failed .* %s' % bad_url):
+      result = self.fs.match([bad_url, url])[0]
+      files = [f.path for f in result.metadata_list]
+      self.assertEqual(files, [self.fs._parse_url(url)])
+
+  def test_match_directory(self):
+    expected_files = [self.fs._parse_url(self.fs.join(self.tmpdir, filename))
+                      for filename in ['old_file1', 'old_file2']]
+
+    result = self.fs.match([self.tmpdir])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertItemsEqual(files, expected_files)
+
+  def test_match_directory_trailing_slash(self):
+    expected_files = [self.fs._parse_url(self.fs.join(self.tmpdir, filename))
+                      for filename in ['old_file1', 'old_file2']]
+
+    result = self.fs.match([self.tmpdir + '/'])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertItemsEqual(files, expected_files)
+
+  def test_create_success(self):
+    url = self.fs.join(self.tmpdir, 'new_file')
+    handle = self.fs.create(url)
+    self.assertIsNotNone(handle)
+    url = self.fs._parse_url(url)
+    expected_file = FakeFile(url, 'wb')
+    self.assertEqual(self._fake_hdfs.files[url], expected_file)
+
+  def test_create_write_read_compressed(self):
+    url = self.fs.join(self.tmpdir, 'new_file.gz')
+
+    handle = self.fs.create(url)
+    self.assertIsNotNone(handle)
+    path = self.fs._parse_url(url)
+    expected_file = FakeFile(path, 'wb')
+    self.assertEqual(self._fake_hdfs.files[path], expected_file)
+    data = 'abc' * 10
+    handle.write(data)
+    # Compressed data != original data
+    self.assertNotEquals(data, self._fake_hdfs.files[path].getvalue())
+    handle.close()
+
+    handle = self.fs.open(url)
+    read_data = handle.read(len(data))
+    self.assertEqual(data, read_data)
+    handle.close()
+
+  def test_open(self):
+    url = self.fs.join(self.tmpdir, 'old_file1')
+    handle = self.fs.open(url)
+    self.assertEqual(handle, self._fake_hdfs.files[self.fs._parse_url(url)])
+
+  def test_open_bad_path(self):
+    with self.assertRaises(FakeHdfsError):
+      self.fs.open(self.fs.join(self.tmpdir, 'nonexistent/path'))
+
+  def _cmpfiles(self, url1, url2):
+    with self.fs.open(url1) as f1:
+      with self.fs.open(url2) as f2:
+        data1 = f1.read()
+        data2 = f2.read()
+        return data1 == data2
+
+  def test_copy_file(self):
+    url1 = self.fs.join(self.tmpdir, 'new_file1')
+    url2 = self.fs.join(self.tmpdir, 'new_file2')
+    url3 = self.fs.join(self.tmpdir, 'new_file3')
+    with self.fs.create(url1) as f1:
+      f1.write('Hello')
+    self.fs.copy([url1, url1], [url2, url3])
+    self.assertTrue(self._cmpfiles(url1, url2))
+    self.assertTrue(self._cmpfiles(url1, url3))
+
+  def test_copy_file_overwrite(self):
+    url1 = self.fs.join(self.tmpdir, 'new_file1')
+    url2 = self.fs.join(self.tmpdir, 'new_file2')
+    with self.fs.create(url1) as f1:
+      f1.write('Hello')
+    with self.fs.create(url2) as f2:
+      f2.write('nope')
+    self.fs.copy([url1], [url2])
+    self.assertTrue(self._cmpfiles(url1, url2))
+
+  def test_copy_file_error(self):
+    url1 = self.fs.join(self.tmpdir, 'new_file1')
+    url2 = self.fs.join(self.tmpdir, 'new_file2')
+    url3 = self.fs.join(self.tmpdir, 'new_file3')
+    url4 = self.fs.join(self.tmpdir, 'new_file4')
+    with self.fs.create(url3) as f:
+      f.write('Hello')
+    with self.assertRaisesRegexp(
+        BeamIOError, r'^Copy operation failed .*%s.*%s.* not found' % (
+            url1, url2)):
+      self.fs.copy([url1, url3], [url2, url4])
+    self.assertTrue(self._cmpfiles(url3, url4))
+
+  def test_copy_directory(self):
+    url_t1 = self.fs.join(self.tmpdir, 't1')
+    url_t1_inner = self.fs.join(self.tmpdir, 't1/inner')
+    url_t2 = self.fs.join(self.tmpdir, 't2')
+    url_t2_inner = self.fs.join(self.tmpdir, 't2/inner')
+    self.fs.mkdirs(url_t1)
+    self.fs.mkdirs(url_t1_inner)
+    self.fs.mkdirs(url_t2)
+
+    url1 = self.fs.join(url_t1_inner, 'f1')
+    url2 = self.fs.join(url_t2_inner, 'f1')
+    with self.fs.create(url1) as f:
+      f.write('Hello')
+
+    self.fs.copy([url_t1], [url_t2])
+    self.assertTrue(self._cmpfiles(url1, url2))
+
+  def test_copy_directory_overwrite(self):
+    url_t1 = self.fs.join(self.tmpdir, 't1')
+    url_t1_inner = self.fs.join(self.tmpdir, 't1/inner')
+    url_t2 = self.fs.join(self.tmpdir, 't2')
+    url_t2_inner = self.fs.join(self.tmpdir, 't2/inner')
+    self.fs.mkdirs(url_t1)
+    self.fs.mkdirs(url_t1_inner)
+    self.fs.mkdirs(url_t2)
+    self.fs.mkdirs(url_t2_inner)
+
+    url1 = self.fs.join(url_t1, 'f1')
+    url1_inner = self.fs.join(url_t1_inner, 'f2')
+    url2 = self.fs.join(url_t2, 'f1')
+    url2_inner = self.fs.join(url_t2_inner, 'f2')
+    url3_inner = self.fs.join(url_t2_inner, 'f3')
+    for url in [url1, url1_inner, url3_inner]:
+      with self.fs.create(url) as f:
+        f.write('Hello')
+    with self.fs.create(url2) as f:
+      f.write('nope')
+
+    self.fs.copy([url_t1], [url_t2])
+    self.assertTrue(self._cmpfiles(url1, url2))
+    self.assertTrue(self._cmpfiles(url1_inner, url2_inner))
+    self.assertTrue(self.fs.exists(url3_inner))
+
+  def test_rename_file(self):
+    url1 = self.fs.join(self.tmpdir, 'f1')
+    url2 = self.fs.join(self.tmpdir, 'f2')
+    with self.fs.create(url1) as f:
+      f.write('Hello')
+
+    self.fs.rename([url1], [url2])
+    self.assertFalse(self.fs.exists(url1))
+    self.assertTrue(self.fs.exists(url2))
+
+  def test_rename_file_error(self):
+    url1 = self.fs.join(self.tmpdir, 'f1')
+    url2 = self.fs.join(self.tmpdir, 'f2')
+    url3 = self.fs.join(self.tmpdir, 'f3')
+    url4 = self.fs.join(self.tmpdir, 'f4')
+    with self.fs.create(url3) as f:
+      f.write('Hello')
+
+    with self.assertRaisesRegexp(
+        BeamIOError, r'^Rename operation failed .*%s.*%s' % (url1, url2)):
+      self.fs.rename([url1, url3], [url2, url4])
+    self.assertFalse(self.fs.exists(url3))
+    self.assertTrue(self.fs.exists(url4))
+
+  def test_rename_directory(self):
+    url_t1 = self.fs.join(self.tmpdir, 't1')
+    url_t2 = self.fs.join(self.tmpdir, 't2')
+    self.fs.mkdirs(url_t1)
+    url1 = self.fs.join(url_t1, 'f1')
+    url2 = self.fs.join(url_t2, 'f1')
+    with self.fs.create(url1) as f:
+      f.write('Hello')
+
+    self.fs.rename([url_t1], [url_t2])
+    self.assertFalse(self.fs.exists(url_t1))
+    self.assertTrue(self.fs.exists(url_t2))
+    self.assertFalse(self.fs.exists(url1))
+    self.assertTrue(self.fs.exists(url2))
+
+  def test_exists(self):
+    url1 = self.fs.join(self.tmpdir, 'old_file1')
+    url2 = self.fs.join(self.tmpdir, 'nonexistent')
+    self.assertTrue(self.fs.exists(url1))
+    self.assertFalse(self.fs.exists(url2))
+
+  def test_delete_file(self):
+    url = self.fs.join(self.tmpdir, 'old_file1')
+
+    self.assertTrue(self.fs.exists(url))
+    self.fs.delete([url])
+    self.assertFalse(self.fs.exists(url))
+
+  def test_delete_dir(self):
+    url_t1 = self.fs.join(self.tmpdir, 'new_dir1')
+    url_t2 = self.fs.join(url_t1, 'new_dir2')
+    url1 = self.fs.join(url_t2, 'new_file1')
+    url2 = self.fs.join(url_t2, 'new_file2')
+    self.fs.mkdirs(url_t1)
+    self.fs.mkdirs(url_t2)
+    self.fs.create(url1).close()
+    self.fs.create(url2).close()
+
+    self.assertTrue(self.fs.exists(url1))
+    self.assertTrue(self.fs.exists(url2))
+    self.fs.delete([url_t1])
+    self.assertFalse(self.fs.exists(url_t1))
+    self.assertFalse(self.fs.exists(url_t2))
+    self.assertFalse(self.fs.exists(url2))
+    self.assertFalse(self.fs.exists(url1))
+
+  def test_delete_error(self):
+    url1 = self.fs.join(self.tmpdir, 'nonexistent')
+    url2 = self.fs.join(self.tmpdir, 'old_file1')
+
+    self.assertTrue(self.fs.exists(url2))
+    path1 = self.fs._parse_url(url1)
+    with self.assertRaisesRegexp(BeamIOError,
+                                 r'^Delete operation failed .* %s' % path1):
+      self.fs.delete([url1, url2])
+    self.assertFalse(self.fs.exists(url2))
diff --git a/sdks/python/generate_pydoc.sh b/sdks/python/generate_pydoc.sh
index 9ae019c2cfe..a79741320fe 100755
--- a/sdks/python/generate_pydoc.sh
+++ b/sdks/python/generate_pydoc.sh
@@ -100,6 +100,7 @@ import apache_beam as beam
 intersphinx_mapping = {
   'python': ('https://docs.python.org/2', None),
   'hamcrest': ('https://pyhamcrest.readthedocs.io/en/latest/', None),
+  'hdfs3': ('https://hdfs3.readthedocs.io/en/latest/', None),
 }
 
 # Since private classes are skipped by sphinx, if there is any cross reference
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 75bd62b8774..26b46892bba 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -111,6 +111,7 @@ def get_version():
     'six>=1.9,<1.11',
     'typing>=3.6.0,<3.7.0',
     'futures>=3.1.1,<4.0.0',
+    'hdfs3>=0.3.0,<0.4.0',
     ]
 
 REQUIRED_SETUP_PACKAGES = [


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Implement HDFS FileSystem for Python SDK
> ----------------------------------------
>
>                 Key: BEAM-3099
>                 URL: https://issues.apache.org/jira/browse/BEAM-3099
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Chamikara Jayalath
>            Assignee: Udi Meiri
>
> Currently Java SDK has HDFS support but Python SDK does not. With current 
> portability efforts other runners may soon be able to use Python SDK. Having 
> HDFS support will allow these runners to execute large scale jobs without 
> using GCS. 
> Following suggests some libraries that can be used to connect to HDFS from 
> Python.
> http://wesmckinney.com/blog/python-hdfs-interfaces/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to