Repository: beam Updated Branches: refs/heads/master 7c7bb8209 -> dd0f8d984
http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/localfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py new file mode 100644 index 0000000..46589b0 --- /dev/null +++ b/sdks/python/apache_beam/io/localfilesystem.py @@ -0,0 +1,236 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Local File system implementation for accessing files on disk.""" + +from __future__ import absolute_import + +import glob +import os +import shutil + +from apache_beam.io.filesystem import BeamIOError +from apache_beam.io.filesystem import CompressedFile +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.filesystem import FileMetadata +from apache_beam.io.filesystem import FileSystem +from apache_beam.io.filesystem import MatchResult + + +class LocalFileSystem(FileSystem): + """A Local ``FileSystem`` implementation for accessing files on disk. + """ + + def mkdirs(self, path): + """Recursively create directories for the provided path. + + Args: + path: string path of the directory structure that should be created + + Raises: + IOError if leaf directory already exists. + """ + try: + os.makedirs(path) + except OSError as err: + raise IOError(err) + + def match(self, patterns, limits=None): + """Find all matching paths to the pattern provided. + + Args: + patterns: list of string for the file path pattern to match against + limits: list of maximum number of responses that need to be fetched + + Returns: list of ``MatchResult`` objects. + + Raises: + ``BeamIOError`` if any of the pattern match operations fail + """ + if limits is None: + limits = [None] * len(patterns) + else: + err_msg = "Patterns and limits should be equal in length" + assert len(patterns) == len(limits), err_msg + + def _match(pattern, limit): + """Find all matching paths to the pattern provided. + """ + files = glob.glob(pattern) + metadata = [FileMetadata(f, os.path.getsize(f)) for f in files[:limit]] + return MatchResult(pattern, metadata) + + exceptions = {} + result = [] + for pattern, limit in zip(patterns, limits): + try: + result.append(_match(pattern, limit)) + except Exception as e: # pylint: disable=broad-except + exceptions[pattern] = e + + if exceptions: + raise BeamIOError("Match operation failed", exceptions) + return result + + def _path_open(self, path, mode, mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): + """Helper functions to open a file in the provided mode. + """ + compression_type = FileSystem._get_compression_type(path, compression_type) + raw_file = open(path, mode) + if compression_type == CompressionTypes.UNCOMPRESSED: + return raw_file + else: + return CompressedFile(raw_file, compression_type=compression_type) + + def create(self, path, mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): + """Returns a write channel for the given file path. + + Args: + path: string path of the file object to be written to the system + mime_type: MIME type to specify the type of content in the file object + compression_type: Type of compression to be used for this object + + Returns: file handle with a close function for the user to use + """ + return self._path_open(path, 'wb', mime_type, compression_type) + + def open(self, path, mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): + """Returns a read channel for the given file path. + + Args: + path: string path of the file object to be written to the system + mime_type: MIME type to specify the type of content in the file object + compression_type: Type of compression to be used for this object + + Returns: file handle with a close function for the user to use + """ + return self._path_open(path, 'rb', mime_type, compression_type) + + def copy(self, source_file_names, destination_file_names): + """Recursively copy the file tree from the source to the destination + + Args: + source_file_names: list of source file objects that needs to be copied + destination_file_names: list of destination of the new object + + Raises: + ``BeamIOError`` if any of the copy operations fail + """ + err_msg = ("source_file_names and destination_file_names should " + "be equal in length") + assert len(source_file_names) == len(destination_file_names), err_msg + + def _copy_path(source, destination): + """Recursively copy the file tree from the source to the destination + """ + try: + if os.path.exists(destination): + if os.path.isdir(destination): + shutil.rmtree(destination) + else: + os.remove(destination) + if os.path.isdir(source): + shutil.copytree(source, destination) + else: + shutil.copy2(source, destination) + except OSError as err: + raise IOError(err) + + exceptions = {} + for source, destination in zip(source_file_names, destination_file_names): + try: + _copy_path(source, destination) + except Exception as e: # pylint: disable=broad-except + exceptions[(source, destination)] = e + + if exceptions: + raise BeamIOError("Copy operation failed", exceptions) + + def rename(self, source_file_names, destination_file_names): + """Rename the files at the source list to the destination list. + Source and destination lists should be of the same size. + + Args: + source_file_names: List of file paths that need to be moved + destination_file_names: List of destination_file_names for the files + + Raises: + ``BeamIOError`` if any of the rename operations fail + """ + err_msg = ("source_file_names and destination_file_names should " + "be equal in length") + assert len(source_file_names) == len(destination_file_names), err_msg + + def _rename_file(source, destination): + """Rename a single file object""" + try: + os.rename(source, destination) + except OSError as err: + raise IOError(err) + + exceptions = {} + for source, destination in zip(source_file_names, destination_file_names): + try: + _rename_file(source, destination) + except Exception as e: # pylint: disable=broad-except + exceptions[(source, destination)] = e + + if exceptions: + raise BeamIOError("Rename operation failed", exceptions) + + def exists(self, path): + """Check if the provided path exists on the FileSystem. + + Args: + path: string path that needs to be checked. + + Returns: boolean flag indicating if path exists + """ + return os.path.exists(path) + + def delete(self, paths): + """Deletes files or directories at the provided paths. + Directories will be deleted recursively. + + Args: + paths: list of paths that give the file objects to be deleted + + Raises: + ``BeamIOError`` if any of the delete operations fail + """ + def _delete_path(path): + """Recursively delete the file or directory at the provided path. + """ + try: + if os.path.isdir(path): + shutil.rmtree(path) + else: + os.remove(path) + except OSError as err: + raise IOError(err) + + exceptions = {} + for path in paths: + try: + _delete_path(path) + except Exception as e: # pylint: disable=broad-except + exceptions[path] = e + + if exceptions: + raise BeamIOError("Delete operation failed", exceptions) http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/localfilesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py new file mode 100644 index 0000000..00059ef --- /dev/null +++ b/sdks/python/apache_beam/io/localfilesystem_test.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for LocalFileSystem.""" + +import unittest + +import filecmp +import os +import shutil +import tempfile +from apache_beam.io.filesystem import BeamIOError +from apache_beam.io.localfilesystem import LocalFileSystem + + +class LocalFileSystemTest(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.fs = LocalFileSystem() + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + def test_mkdirs(self): + path = os.path.join(self.tmpdir, 't1/t2') + self.fs.mkdirs(path) + self.assertTrue(os.path.isdir(path)) + + def test_mkdirs_failed(self): + path = os.path.join(self.tmpdir, 't1/t2') + self.fs.mkdirs(path) + + # Check IOError if existing directory is created + with self.assertRaises(IOError): + self.fs.mkdirs(path) + + with self.assertRaises(IOError): + self.fs.mkdirs(os.path.join(self.tmpdir, 't1')) + + def test_match_file(self): + path = os.path.join(self.tmpdir, 'f1') + open(path, 'a').close() + + # Match files in the temp directory + result = self.fs.match([path])[0] + files = [f.path for f in result.metadata_list] + self.assertEqual(files, [path]) + + def test_match_file_empty(self): + path = os.path.join(self.tmpdir, 'f2') # Does not exist + + # Match files in the temp directory + result = self.fs.match([path])[0] + files = [f.path for f in result.metadata_list] + self.assertEqual(files, []) + + def test_match_file_exception(self): + # Match files with None so that it throws an exception + with self.assertRaises(BeamIOError) as error: + self.fs.match([None]) + self.assertEqual(error.exception.message, 'Match operation failed') + self.assertEqual(error.exception.exception_details.keys(), [None]) + + def test_match_directory(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + open(path1, 'a').close() + open(path2, 'a').close() + + # Match both the files in the directory + path = os.path.join(self.tmpdir, '*') + result = self.fs.match([path])[0] + files = [f.path for f in result.metadata_list] + self.assertEqual(files, [path1, path2]) + + def test_match_directory(self): + result = self.fs.match([self.tmpdir])[0] + files = [f.path for f in result.metadata_list] + self.assertEqual(files, [self.tmpdir]) + + def test_copy(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + with open(path1, 'a') as f: + f.write('Hello') + + self.fs.copy([path1], [path2]) + self.assertTrue(filecmp.cmp(path1, path2)) + + def test_copy_error(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + with self.assertRaises(BeamIOError) as error: + self.fs.copy([path1], [path2]) + self.assertEqual(error.exception.message, 'Copy operation failed') + self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + + def test_copy_directory(self): + path_t1 = os.path.join(self.tmpdir, 't1') + path_t2 = os.path.join(self.tmpdir, 't2') + self.fs.mkdirs(path_t1) + self.fs.mkdirs(path_t2) + + path1 = os.path.join(path_t1, 'f1') + path2 = os.path.join(path_t2, 'f1') + with open(path1, 'a') as f: + f.write('Hello') + + self.fs.copy([path_t1], [path_t2]) + self.assertTrue(filecmp.cmp(path1, path2)) + + def test_rename(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + with open(path1, 'a') as f: + f.write('Hello') + + self.fs.rename([path1], [path2]) + self.assertTrue(self.fs.exists(path2)) + self.assertFalse(self.fs.exists(path1)) + + def test_rename_error(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + with self.assertRaises(BeamIOError) as error: + self.fs.rename([path1], [path2]) + self.assertEqual(error.exception.message, 'Rename operation failed') + self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + + def test_rename_directory(self): + path_t1 = os.path.join(self.tmpdir, 't1') + path_t2 = os.path.join(self.tmpdir, 't2') + self.fs.mkdirs(path_t1) + + path1 = os.path.join(path_t1, 'f1') + path2 = os.path.join(path_t2, 'f1') + with open(path1, 'a') as f: + f.write('Hello') + + self.fs.rename([path_t1], [path_t2]) + self.assertTrue(self.fs.exists(path_t2)) + self.assertFalse(self.fs.exists(path_t1)) + self.assertTrue(self.fs.exists(path2)) + self.assertFalse(self.fs.exists(path1)) + + def test_exists(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + with open(path1, 'a') as f: + f.write('Hello') + self.assertTrue(self.fs.exists(path1)) + self.assertFalse(self.fs.exists(path2)) + + def test_delete(self): + path1 = os.path.join(self.tmpdir, 'f1') + + with open(path1, 'a') as f: + f.write('Hello') + + self.assertTrue(self.fs.exists(path1)) + self.fs.delete([path1]) + self.assertFalse(self.fs.exists(path1)) + + def test_delete_error(self): + path1 = os.path.join(self.tmpdir, 'f1') + with self.assertRaises(BeamIOError) as error: + self.fs.delete([path1]) + self.assertEqual(error.exception.message, 'Delete operation failed') + self.assertEqual(error.exception.exception_details.keys(), [path1]) http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/textio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 5bb1a9d..8122fae 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -25,6 +25,7 @@ from apache_beam import coders from apache_beam.io import filebasedsource from apache_beam.io import fileio from apache_beam.io import iobase +from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write from apache_beam.transforms import PTransform @@ -271,7 +272,7 @@ class _TextSink(fileio.FileSink): num_shards=0, shard_name_template=None, coder=coders.ToStringCoder(), - compression_type=fileio.CompressionTypes.AUTO, + compression_type=CompressionTypes.AUTO, header=None): """Initialize a _TextSink. @@ -355,7 +356,7 @@ class ReadFromText(PTransform): self, file_pattern=None, min_bundle_size=0, - compression_type=fileio.CompressionTypes.AUTO, + compression_type=CompressionTypes.AUTO, strip_trailing_newlines=True, coder=coders.StrUtf8Coder(), validate=True, @@ -404,7 +405,7 @@ class WriteToText(PTransform): num_shards=0, shard_name_template=None, coder=coders.ToStringCoder(), - compression_type=fileio.CompressionTypes.AUTO, + compression_type=CompressionTypes.AUTO, header=None): """Initialize a WriteToText PTransform. http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 04cf44c..b3f4391 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -41,7 +41,7 @@ from apache_beam import coders from apache_beam.io.filebasedsource_test import EOL from apache_beam.io.filebasedsource_test import write_data from apache_beam.io.filebasedsource_test import write_pattern -from apache_beam.io.fileio import CompressionTypes +from apache_beam.io.filesystem import CompressionTypes from apache_beam.test_pipeline import TestPipeline http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/tfrecordio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py index 05c0a13..8b9d9ea 100644 --- a/sdks/python/apache_beam/io/tfrecordio.py +++ b/sdks/python/apache_beam/io/tfrecordio.py @@ -24,6 +24,7 @@ import struct from apache_beam import coders from apache_beam.io import filebasedsource from apache_beam.io import fileio +from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write from apache_beam.transforms import PTransform @@ -180,7 +181,7 @@ class ReadFromTFRecord(PTransform): def __init__(self, file_pattern, coder=coders.BytesCoder(), - compression_type=fileio.CompressionTypes.AUTO, + compression_type=CompressionTypes.AUTO, validate=True, **kwargs): """Initialize a ReadFromTFRecord transform. @@ -239,7 +240,7 @@ class WriteToTFRecord(PTransform): file_name_suffix='', num_shards=0, shard_name_template=fileio.DEFAULT_SHARD_NAME_TEMPLATE, - compression_type=fileio.CompressionTypes.AUTO, + compression_type=CompressionTypes.AUTO, **kwargs): """Initialize WriteToTFRecord transform. http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/tfrecordio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py index df33fcb..49f9639 100644 --- a/sdks/python/apache_beam/io/tfrecordio_test.py +++ b/sdks/python/apache_beam/io/tfrecordio_test.py @@ -29,7 +29,7 @@ import unittest import apache_beam as beam from apache_beam import coders -from apache_beam.io import fileio +from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.tfrecordio import _TFRecordSink from apache_beam.io.tfrecordio import _TFRecordSource from apache_beam.io.tfrecordio import _TFRecordUtil @@ -175,7 +175,7 @@ class TestTFRecordSink(_TestCaseWithTempDirCleanUp): file_name_suffix='', num_shards=0, shard_name_template=None, - compression_type=fileio.CompressionTypes.UNCOMPRESSED) + compression_type=CompressionTypes.UNCOMPRESSED) self._write_lines(sink, path, ['foo']) with open(path, 'r') as f: @@ -190,7 +190,7 @@ class TestTFRecordSink(_TestCaseWithTempDirCleanUp): file_name_suffix='', num_shards=0, shard_name_template=None, - compression_type=fileio.CompressionTypes.UNCOMPRESSED) + compression_type=CompressionTypes.UNCOMPRESSED) self._write_lines(sink, path, ['foo', 'bar']) with open(path, 'r') as f: @@ -205,7 +205,7 @@ class TestWriteToTFRecord(TestTFRecordSink): with TestPipeline() as p: input_data = ['foo', 'bar'] _ = p | beam.Create(input_data) | WriteToTFRecord( - file_path_prefix, compression_type=fileio.CompressionTypes.GZIP) + file_path_prefix, compression_type=CompressionTypes.GZIP) actual = [] file_name = glob.glob(file_path_prefix + '-*')[0] @@ -252,7 +252,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): _TFRecordSource( path, coder=coders.BytesCoder(), - compression_type=fileio.CompressionTypes.AUTO, + compression_type=CompressionTypes.AUTO, validate=True))) beam.assert_that(result, beam.equal_to(['foo'])) @@ -265,7 +265,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): _TFRecordSource( path, coder=coders.BytesCoder(), - compression_type=fileio.CompressionTypes.AUTO, + compression_type=CompressionTypes.AUTO, validate=True))) beam.assert_that(result, beam.equal_to(['foo', 'bar'])) @@ -278,7 +278,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): _TFRecordSource( path, coder=coders.BytesCoder(), - compression_type=fileio.CompressionTypes.GZIP, + compression_type=CompressionTypes.GZIP, validate=True))) beam.assert_that(result, beam.equal_to(['foo', 'bar'])) @@ -291,7 +291,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): _TFRecordSource( path, coder=coders.BytesCoder(), - compression_type=fileio.CompressionTypes.AUTO, + compression_type=CompressionTypes.AUTO, validate=True))) beam.assert_that(result, beam.equal_to(['foo', 'bar'])) @@ -304,7 +304,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource): with TestPipeline() as p: result = (p | ReadFromTFRecord( - path, compression_type=fileio.CompressionTypes.GZIP)) + path, compression_type=CompressionTypes.GZIP)) beam.assert_that(result, beam.equal_to(['foo', 'bar'])) def test_process_gzip_auto(self): @@ -313,7 +313,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource): with TestPipeline() as p: result = (p | ReadFromTFRecord( - path, compression_type=fileio.CompressionTypes.AUTO)) + path, compression_type=CompressionTypes.AUTO)) beam.assert_that(result, beam.equal_to(['foo', 'bar'])) http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/tests/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py index 379a96f..0d6814e 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py @@ -26,7 +26,7 @@ import logging from hamcrest.core.base_matcher import BaseMatcher -from apache_beam.io.fileio import ChannelFactory +from apache_beam.io.filesystems_util import get_filesystem from apache_beam.runners.runner import PipelineState from apache_beam.tests import test_utils as utils from apache_beam.utils import retry @@ -81,6 +81,7 @@ class FileChecksumMatcher(BaseMatcher): def __init__(self, file_path, expected_checksum): self.file_path = file_path + self.file_system = get_filesystem(self.file_path) self.expected_checksum = expected_checksum @retry.with_exponential_backoff( @@ -89,11 +90,12 @@ class FileChecksumMatcher(BaseMatcher): def _read_with_retry(self): """Read path with retry if I/O failed""" read_lines = [] - matched_path = ChannelFactory.glob(self.file_path) + match_result = self.file_system.match([self.file_path])[0] + matched_path = [f.path for f in match_result.metadata_list] if not matched_path: raise IOError('No such file or directory: %s' % self.file_path) for path in matched_path: - with ChannelFactory.open(path, 'r') as f: + with self.file_system.open(path, 'r') as f: for line in f: read_lines.append(line) return read_lines http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/tests/pipeline_verifiers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py index 586af82..af8f441 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py @@ -24,16 +24,20 @@ import unittest from hamcrest import assert_that as hc_assert_that from mock import Mock, patch -from apache_beam.io.fileio import ChannelFactory +from apache_beam.io.localfilesystem import LocalFileSystem from apache_beam.runners.runner import PipelineState from apache_beam.runners.runner import PipelineResult from apache_beam.tests import pipeline_verifiers as verifiers from apache_beam.tests.test_utils import patch_retry try: + # pylint: disable=wrong-import-order, wrong-import-position + # pylint: disable=ungrouped-imports from apitools.base.py.exceptions import HttpError + from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem except ImportError: HttpError = None + GCSFileSystem = None class PipelineVerifiersTest(unittest.TestCase): @@ -96,26 +100,26 @@ class PipelineVerifiersTest(unittest.TestCase): case['expected_checksum']) hc_assert_that(self._mock_result, matcher) - @patch.object(ChannelFactory, 'glob') - def test_file_checksum_matcher_read_failed(self, mock_glob): - mock_glob.side_effect = IOError('No file found.') + @patch.object(LocalFileSystem, 'match') + def test_file_checksum_matcher_read_failed(self, mock_match): + mock_match.side_effect = IOError('No file found.') matcher = verifiers.FileChecksumMatcher('dummy/path', Mock()) with self.assertRaises(IOError): hc_assert_that(self._mock_result, matcher) - self.assertTrue(mock_glob.called) - self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count) + self.assertTrue(mock_match.called) + self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count) - @patch.object(ChannelFactory, 'glob') + @patch.object(GCSFileSystem, 'match') @unittest.skipIf(HttpError is None, 'google-apitools is not installed') - def test_file_checksum_matcher_service_error(self, mock_glob): - mock_glob.side_effect = HttpError( + def test_file_checksum_matcher_service_error(self, mock_match): + mock_match.side_effect = HttpError( response={'status': '404'}, url='', content='Not Found', ) matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock()) with self.assertRaises(HttpError): hc_assert_that(self._mock_result, matcher) - self.assertTrue(mock_glob.called) - self.assertEqual(verifiers.MAX_RETRIES + 1, mock_glob.call_count) + self.assertTrue(mock_match.called) + self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count) if __name__ == '__main__':
