Repository: beam
Updated Branches:
  refs/heads/master 7c7bb8209 -> dd0f8d984


http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py 
b/sdks/python/apache_beam/io/localfilesystem.py
new file mode 100644
index 0000000..46589b0
--- /dev/null
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -0,0 +1,236 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Local File system implementation for accessing files on disk."""
+
+from __future__ import absolute_import
+
+import glob
+import os
+import shutil
+
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressedFile
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystem import FileMetadata
+from apache_beam.io.filesystem import FileSystem
+from apache_beam.io.filesystem import MatchResult
+
+
+class LocalFileSystem(FileSystem):
+  """A Local ``FileSystem`` implementation for accessing files on disk.
+  """
+
+  def mkdirs(self, path):
+    """Recursively create directories for the provided path.
+
+    Args:
+      path: string path of the directory structure that should be created
+
+    Raises:
+      IOError if leaf directory already exists.
+    """
+    try:
+      os.makedirs(path)
+    except OSError as err:
+      raise IOError(err)
+
+  def match(self, patterns, limits=None):
+    """Find all matching paths to the pattern provided.
+
+    Args:
+      patterns: list of string for the file path pattern to match against
+      limits: list of maximum number of responses that need to be fetched
+
+    Returns: list of ``MatchResult`` objects.
+
+    Raises:
+      ``BeamIOError`` if any of the pattern match operations fail
+    """
+    if limits is None:
+      limits = [None] * len(patterns)
+    else:
+      err_msg = "Patterns and limits should be equal in length"
+      assert len(patterns) == len(limits), err_msg
+
+    def _match(pattern, limit):
+      """Find all matching paths to the pattern provided.
+      """
+      files = glob.glob(pattern)
+      metadata = [FileMetadata(f, os.path.getsize(f)) for f in files[:limit]]
+      return MatchResult(pattern, metadata)
+
+    exceptions = {}
+    result = []
+    for pattern, limit in zip(patterns, limits):
+      try:
+        result.append(_match(pattern, limit))
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[pattern] = e
+
+    if exceptions:
+      raise BeamIOError("Match operation failed", exceptions)
+    return result
+
+  def _path_open(self, path, mode, mime_type='application/octet-stream',
+                 compression_type=CompressionTypes.AUTO):
+    """Helper functions to open a file in the provided mode.
+    """
+    compression_type = FileSystem._get_compression_type(path, compression_type)
+    raw_file = open(path, mode)
+    if compression_type == CompressionTypes.UNCOMPRESSED:
+      return raw_file
+    else:
+      return CompressedFile(raw_file, compression_type=compression_type)
+
+  def create(self, path, mime_type='application/octet-stream',
+             compression_type=CompressionTypes.AUTO):
+    """Returns a write channel for the given file path.
+
+    Args:
+      path: string path of the file object to be written to the system
+      mime_type: MIME type to specify the type of content in the file object
+      compression_type: Type of compression to be used for this object
+
+    Returns: file handle with a close function for the user to use
+    """
+    return self._path_open(path, 'wb', mime_type, compression_type)
+
+  def open(self, path, mime_type='application/octet-stream',
+           compression_type=CompressionTypes.AUTO):
+    """Returns a read channel for the given file path.
+
+    Args:
+      path: string path of the file object to be written to the system
+      mime_type: MIME type to specify the type of content in the file object
+      compression_type: Type of compression to be used for this object
+
+    Returns: file handle with a close function for the user to use
+    """
+    return self._path_open(path, 'rb', mime_type, compression_type)
+
+  def copy(self, source_file_names, destination_file_names):
+    """Recursively copy the file tree from the source to the destination
+
+    Args:
+      source_file_names: list of source file objects that needs to be copied
+      destination_file_names: list of destination of the new object
+
+    Raises:
+      ``BeamIOError`` if any of the copy operations fail
+    """
+    err_msg = ("source_file_names and destination_file_names should "
+               "be equal in length")
+    assert len(source_file_names) == len(destination_file_names), err_msg
+
+    def _copy_path(source, destination):
+      """Recursively copy the file tree from the source to the destination
+      """
+      try:
+        if os.path.exists(destination):
+          if os.path.isdir(destination):
+            shutil.rmtree(destination)
+          else:
+            os.remove(destination)
+        if os.path.isdir(source):
+          shutil.copytree(source, destination)
+        else:
+          shutil.copy2(source, destination)
+      except OSError as err:
+        raise IOError(err)
+
+    exceptions = {}
+    for source, destination in zip(source_file_names, destination_file_names):
+      try:
+        _copy_path(source, destination)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[(source, destination)] = e
+
+    if exceptions:
+      raise BeamIOError("Copy operation failed", exceptions)
+
+  def rename(self, source_file_names, destination_file_names):
+    """Rename the files at the source list to the destination list.
+    Source and destination lists should be of the same size.
+
+    Args:
+      source_file_names: List of file paths that need to be moved
+      destination_file_names: List of destination_file_names for the files
+
+    Raises:
+      ``BeamIOError`` if any of the rename operations fail
+    """
+    err_msg = ("source_file_names and destination_file_names should "
+               "be equal in length")
+    assert len(source_file_names) == len(destination_file_names), err_msg
+
+    def _rename_file(source, destination):
+      """Rename a single file object"""
+      try:
+        os.rename(source, destination)
+      except OSError as err:
+        raise IOError(err)
+
+    exceptions = {}
+    for source, destination in zip(source_file_names, destination_file_names):
+      try:
+        _rename_file(source, destination)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[(source, destination)] = e
+
+    if exceptions:
+      raise BeamIOError("Rename operation failed", exceptions)
+
+  def exists(self, path):
+    """Check if the provided path exists on the FileSystem.
+
+    Args:
+      path: string path that needs to be checked.
+
+    Returns: boolean flag indicating if path exists
+    """
+    return os.path.exists(path)
+
+  def delete(self, paths):
+    """Deletes files or directories at the provided paths.
+    Directories will be deleted recursively.
+
+    Args:
+      paths: list of paths that give the file objects to be deleted
+
+    Raises:
+      ``BeamIOError`` if any of the delete operations fail
+    """
+    def _delete_path(path):
+      """Recursively delete the file or directory at the provided path.
+      """
+      try:
+        if os.path.isdir(path):
+          shutil.rmtree(path)
+        else:
+          os.remove(path)
+      except OSError as err:
+        raise IOError(err)
+
+    exceptions = {}
+    for path in paths:
+      try:
+        _delete_path(path)
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[path] = e
+
+    if exceptions:
+      raise BeamIOError("Delete operation failed", exceptions)

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/localfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py 
b/sdks/python/apache_beam/io/localfilesystem_test.py
new file mode 100644
index 0000000..00059ef
--- /dev/null
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for LocalFileSystem."""
+
+import unittest
+
+import filecmp
+import os
+import shutil
+import tempfile
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.localfilesystem import LocalFileSystem
+
+
+class LocalFileSystemTest(unittest.TestCase):
+
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+    self.fs = LocalFileSystem()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  def test_mkdirs(self):
+    path = os.path.join(self.tmpdir, 't1/t2')
+    self.fs.mkdirs(path)
+    self.assertTrue(os.path.isdir(path))
+
+  def test_mkdirs_failed(self):
+    path = os.path.join(self.tmpdir, 't1/t2')
+    self.fs.mkdirs(path)
+
+    # Check IOError if existing directory is created
+    with self.assertRaises(IOError):
+      self.fs.mkdirs(path)
+
+    with self.assertRaises(IOError):
+      self.fs.mkdirs(os.path.join(self.tmpdir, 't1'))
+
+  def test_match_file(self):
+    path = os.path.join(self.tmpdir, 'f1')
+    open(path, 'a').close()
+
+    # Match files in the temp directory
+    result = self.fs.match([path])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [path])
+
+  def test_match_file_empty(self):
+    path = os.path.join(self.tmpdir, 'f2')  # Does not exist
+
+    # Match files in the temp directory
+    result = self.fs.match([path])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [])
+
+  def test_match_file_exception(self):
+    # Match files with None so that it throws an exception
+    with self.assertRaises(BeamIOError) as error:
+      self.fs.match([None])
+    self.assertEqual(error.exception.message, 'Match operation failed')
+    self.assertEqual(error.exception.exception_details.keys(), [None])
+
+  def test_match_directory(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    open(path1, 'a').close()
+    open(path2, 'a').close()
+
+    # Match both the files in the directory
+    path = os.path.join(self.tmpdir, '*')
+    result = self.fs.match([path])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [path1, path2])
+
+  def test_match_directory(self):
+    result = self.fs.match([self.tmpdir])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [self.tmpdir])
+
+  def test_copy(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.fs.copy([path1], [path2])
+    self.assertTrue(filecmp.cmp(path1, path2))
+
+  def test_copy_error(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with self.assertRaises(BeamIOError) as error:
+      self.fs.copy([path1], [path2])
+    self.assertEqual(error.exception.message, 'Copy operation failed')
+    self.assertEqual(error.exception.exception_details.keys(), [(path1, 
path2)])
+
+  def test_copy_directory(self):
+    path_t1 = os.path.join(self.tmpdir, 't1')
+    path_t2 = os.path.join(self.tmpdir, 't2')
+    self.fs.mkdirs(path_t1)
+    self.fs.mkdirs(path_t2)
+
+    path1 = os.path.join(path_t1, 'f1')
+    path2 = os.path.join(path_t2, 'f1')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.fs.copy([path_t1], [path_t2])
+    self.assertTrue(filecmp.cmp(path1, path2))
+
+  def test_rename(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.fs.rename([path1], [path2])
+    self.assertTrue(self.fs.exists(path2))
+    self.assertFalse(self.fs.exists(path1))
+
+  def test_rename_error(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with self.assertRaises(BeamIOError) as error:
+      self.fs.rename([path1], [path2])
+    self.assertEqual(error.exception.message, 'Rename operation failed')
+    self.assertEqual(error.exception.exception_details.keys(), [(path1, 
path2)])
+
+  def test_rename_directory(self):
+    path_t1 = os.path.join(self.tmpdir, 't1')
+    path_t2 = os.path.join(self.tmpdir, 't2')
+    self.fs.mkdirs(path_t1)
+
+    path1 = os.path.join(path_t1, 'f1')
+    path2 = os.path.join(path_t2, 'f1')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.fs.rename([path_t1], [path_t2])
+    self.assertTrue(self.fs.exists(path_t2))
+    self.assertFalse(self.fs.exists(path_t1))
+    self.assertTrue(self.fs.exists(path2))
+    self.assertFalse(self.fs.exists(path1))
+
+  def test_exists(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+    self.assertTrue(self.fs.exists(path1))
+    self.assertFalse(self.fs.exists(path2))
+
+  def test_delete(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.assertTrue(self.fs.exists(path1))
+    self.fs.delete([path1])
+    self.assertFalse(self.fs.exists(path1))
+
+  def test_delete_error(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    with self.assertRaises(BeamIOError) as error:
+      self.fs.delete([path1])
+    self.assertEqual(error.exception.message, 'Delete operation failed')
+    self.assertEqual(error.exception.exception_details.keys(), [path1])

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py 
b/sdks/python/apache_beam/io/textio.py
index 5bb1a9d..8122fae 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -25,6 +25,7 @@ from apache_beam import coders
 from apache_beam.io import filebasedsource
 from apache_beam.io import fileio
 from apache_beam.io import iobase
+from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
 from apache_beam.transforms import PTransform
@@ -271,7 +272,7 @@ class _TextSink(fileio.FileSink):
                num_shards=0,
                shard_name_template=None,
                coder=coders.ToStringCoder(),
-               compression_type=fileio.CompressionTypes.AUTO,
+               compression_type=CompressionTypes.AUTO,
                header=None):
     """Initialize a _TextSink.
 
@@ -355,7 +356,7 @@ class ReadFromText(PTransform):
       self,
       file_pattern=None,
       min_bundle_size=0,
-      compression_type=fileio.CompressionTypes.AUTO,
+      compression_type=CompressionTypes.AUTO,
       strip_trailing_newlines=True,
       coder=coders.StrUtf8Coder(),
       validate=True,
@@ -404,7 +405,7 @@ class WriteToText(PTransform):
                num_shards=0,
                shard_name_template=None,
                coder=coders.ToStringCoder(),
-               compression_type=fileio.CompressionTypes.AUTO,
+               compression_type=CompressionTypes.AUTO,
                header=None):
     """Initialize a WriteToText PTransform.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py 
b/sdks/python/apache_beam/io/textio_test.py
index 04cf44c..b3f4391 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -41,7 +41,7 @@ from apache_beam import coders
 from apache_beam.io.filebasedsource_test import EOL
 from apache_beam.io.filebasedsource_test import write_data
 from apache_beam.io.filebasedsource_test import write_pattern
-from apache_beam.io.fileio import CompressionTypes
+from apache_beam.io.filesystem import CompressionTypes
 
 from apache_beam.test_pipeline import TestPipeline
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/tfrecordio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio.py 
b/sdks/python/apache_beam/io/tfrecordio.py
index 05c0a13..8b9d9ea 100644
--- a/sdks/python/apache_beam/io/tfrecordio.py
+++ b/sdks/python/apache_beam/io/tfrecordio.py
@@ -24,6 +24,7 @@ import struct
 from apache_beam import coders
 from apache_beam.io import filebasedsource
 from apache_beam.io import fileio
+from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
 from apache_beam.transforms import PTransform
@@ -180,7 +181,7 @@ class ReadFromTFRecord(PTransform):
   def __init__(self,
                file_pattern,
                coder=coders.BytesCoder(),
-               compression_type=fileio.CompressionTypes.AUTO,
+               compression_type=CompressionTypes.AUTO,
                validate=True,
                **kwargs):
     """Initialize a ReadFromTFRecord transform.
@@ -239,7 +240,7 @@ class WriteToTFRecord(PTransform):
                file_name_suffix='',
                num_shards=0,
                shard_name_template=fileio.DEFAULT_SHARD_NAME_TEMPLATE,
-               compression_type=fileio.CompressionTypes.AUTO,
+               compression_type=CompressionTypes.AUTO,
                **kwargs):
     """Initialize WriteToTFRecord transform.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/tfrecordio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py 
b/sdks/python/apache_beam/io/tfrecordio_test.py
index df33fcb..49f9639 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -29,7 +29,7 @@ import unittest
 
 import apache_beam as beam
 from apache_beam import coders
-from apache_beam.io import fileio
+from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.tfrecordio import _TFRecordSink
 from apache_beam.io.tfrecordio import _TFRecordSource
 from apache_beam.io.tfrecordio import _TFRecordUtil
@@ -175,7 +175,7 @@ class TestTFRecordSink(_TestCaseWithTempDirCleanUp):
         file_name_suffix='',
         num_shards=0,
         shard_name_template=None,
-        compression_type=fileio.CompressionTypes.UNCOMPRESSED)
+        compression_type=CompressionTypes.UNCOMPRESSED)
     self._write_lines(sink, path, ['foo'])
 
     with open(path, 'r') as f:
@@ -190,7 +190,7 @@ class TestTFRecordSink(_TestCaseWithTempDirCleanUp):
         file_name_suffix='',
         num_shards=0,
         shard_name_template=None,
-        compression_type=fileio.CompressionTypes.UNCOMPRESSED)
+        compression_type=CompressionTypes.UNCOMPRESSED)
     self._write_lines(sink, path, ['foo', 'bar'])
 
     with open(path, 'r') as f:
@@ -205,7 +205,7 @@ class TestWriteToTFRecord(TestTFRecordSink):
     with TestPipeline() as p:
       input_data = ['foo', 'bar']
       _ = p | beam.Create(input_data) | WriteToTFRecord(
-          file_path_prefix, compression_type=fileio.CompressionTypes.GZIP)
+          file_path_prefix, compression_type=CompressionTypes.GZIP)
 
     actual = []
     file_name = glob.glob(file_path_prefix + '-*')[0]
@@ -252,7 +252,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                     _TFRecordSource(
                         path,
                         coder=coders.BytesCoder(),
-                        compression_type=fileio.CompressionTypes.AUTO,
+                        compression_type=CompressionTypes.AUTO,
                         validate=True)))
       beam.assert_that(result, beam.equal_to(['foo']))
 
@@ -265,7 +265,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                     _TFRecordSource(
                         path,
                         coder=coders.BytesCoder(),
-                        compression_type=fileio.CompressionTypes.AUTO,
+                        compression_type=CompressionTypes.AUTO,
                         validate=True)))
       beam.assert_that(result, beam.equal_to(['foo', 'bar']))
 
@@ -278,7 +278,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                     _TFRecordSource(
                         path,
                         coder=coders.BytesCoder(),
-                        compression_type=fileio.CompressionTypes.GZIP,
+                        compression_type=CompressionTypes.GZIP,
                         validate=True)))
       beam.assert_that(result, beam.equal_to(['foo', 'bar']))
 
@@ -291,7 +291,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
                     _TFRecordSource(
                         path,
                         coder=coders.BytesCoder(),
-                        compression_type=fileio.CompressionTypes.AUTO,
+                        compression_type=CompressionTypes.AUTO,
                         validate=True)))
       beam.assert_that(result, beam.equal_to(['foo', 'bar']))
 
@@ -304,7 +304,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
     with TestPipeline() as p:
       result = (p
                 | ReadFromTFRecord(
-                    path, compression_type=fileio.CompressionTypes.GZIP))
+                    path, compression_type=CompressionTypes.GZIP))
       beam.assert_that(result, beam.equal_to(['foo', 'bar']))
 
   def test_process_gzip_auto(self):
@@ -313,7 +313,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource):
     with TestPipeline() as p:
       result = (p
                 | ReadFromTFRecord(
-                    path, compression_type=fileio.CompressionTypes.AUTO))
+                    path, compression_type=CompressionTypes.AUTO))
       beam.assert_that(result, beam.equal_to(['foo', 'bar']))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py 
b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 379a96f..0d6814e 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -26,7 +26,7 @@ import logging
 
 from hamcrest.core.base_matcher import BaseMatcher
 
-from apache_beam.io.fileio import ChannelFactory
+from apache_beam.io.filesystems_util import get_filesystem
 from apache_beam.runners.runner import PipelineState
 from apache_beam.tests import test_utils as utils
 from apache_beam.utils import retry
@@ -81,6 +81,7 @@ class FileChecksumMatcher(BaseMatcher):
 
   def __init__(self, file_path, expected_checksum):
     self.file_path = file_path
+    self.file_system = get_filesystem(self.file_path)
     self.expected_checksum = expected_checksum
 
   @retry.with_exponential_backoff(
@@ -89,11 +90,12 @@ class FileChecksumMatcher(BaseMatcher):
   def _read_with_retry(self):
     """Read path with retry if I/O failed"""
     read_lines = []
-    matched_path = ChannelFactory.glob(self.file_path)
+    match_result = self.file_system.match([self.file_path])[0]
+    matched_path = [f.path for f in match_result.metadata_list]
     if not matched_path:
       raise IOError('No such file or directory: %s' % self.file_path)
     for path in matched_path:
-      with ChannelFactory.open(path, 'r') as f:
+      with self.file_system.open(path, 'r') as f:
         for line in f:
           read_lines.append(line)
     return read_lines

http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py 
b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index 586af82..af8f441 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -24,16 +24,20 @@ import unittest
 from hamcrest import assert_that as hc_assert_that
 from mock import Mock, patch
 
-from apache_beam.io.fileio import ChannelFactory
+from apache_beam.io.localfilesystem import LocalFileSystem
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.tests import pipeline_verifiers as verifiers
 from apache_beam.tests.test_utils import patch_retry
 
 try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
   from apitools.base.py.exceptions import HttpError
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
 except ImportError:
   HttpError = None
+  GCSFileSystem = None
 
 
 class PipelineVerifiersTest(unittest.TestCase):
@@ -96,26 +100,26 @@ class PipelineVerifiersTest(unittest.TestCase):
                                               case['expected_checksum'])
       hc_assert_that(self._mock_result, matcher)
 
-  @patch.object(ChannelFactory, 'glob')
-  def test_file_checksum_matcher_read_failed(self, mock_glob):
-    mock_glob.side_effect = IOError('No file found.')
+  @patch.object(LocalFileSystem, 'match')
+  def test_file_checksum_matcher_read_failed(self, mock_match):
+    mock_match.side_effect = IOError('No file found.')
     matcher = verifiers.FileChecksumMatcher('dummy/path', Mock())
     with self.assertRaises(IOError):
       hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_glob.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
+    self.assertTrue(mock_match.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
 
-  @patch.object(ChannelFactory, 'glob')
+  @patch.object(GCSFileSystem, 'match')
   @unittest.skipIf(HttpError is None, 'google-apitools is not installed')
-  def test_file_checksum_matcher_service_error(self, mock_glob):
-    mock_glob.side_effect = HttpError(
+  def test_file_checksum_matcher_service_error(self, mock_match):
+    mock_match.side_effect = HttpError(
         response={'status': '404'}, url='', content='Not Found',
     )
     matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock())
     with self.assertRaises(HttpError):
       hc_assert_that(self._mock_result, matcher)
-    self.assertTrue(mock_glob.called)
-    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count)
+    self.assertTrue(mock_match.called)
+    self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
 
 
 if __name__ == '__main__':

Reply via email to