Repository: beam Updated Branches: refs/heads/master 0d69611e2 -> b8c568f29
[BEAM-1988] Add FileSystems Interface for accessing underlying FS correctly Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ad6dcf4d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ad6dcf4d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ad6dcf4d Branch: refs/heads/master Commit: ad6dcf4d1d22b7e6e349db9027ef639a5410b494 Parents: 0d69611 Author: Sourabh Bajaj <[email protected]> Authored: Tue Apr 25 12:01:21 2017 -0700 Committer: Chamikara Jayalath <[email protected]> Committed: Tue Apr 25 16:21:30 2017 -0700 ---------------------------------------------------------------------- sdks/python/.pylintrc | 1 + sdks/python/apache_beam/io/filebasedsource.py | 20 +- sdks/python/apache_beam/io/fileio.py | 25 +-- sdks/python/apache_beam/io/filesystem.py | 6 +- sdks/python/apache_beam/io/filesystems.py | 186 +++++++++++++++ sdks/python/apache_beam/io/filesystems_test.py | 224 +++++++++++++++++++ .../apache_beam/io/localfilesystem_test.py | 4 +- .../runners/dataflow/internal/apiclient.py | 7 +- .../runners/dataflow/internal/dependency.py | 32 ++- .../dataflow/internal/dependency_test.py | 7 +- .../apache_beam/tests/pipeline_verifiers.py | 7 +- 11 files changed, 452 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/.pylintrc ---------------------------------------------------------------------- diff --git a/sdks/python/.pylintrc b/sdks/python/.pylintrc index 429ebdb..6418249 100644 --- a/sdks/python/.pylintrc +++ b/sdks/python/.pylintrc @@ -95,6 +95,7 @@ disable = import-self, invalid-name, invalid-unary-operand-type, + len-as-condition, locally-disabled, locally-enabled, misplaced-bare-raise, http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index ef44b3e..e25f92e 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -30,7 +30,7 @@ from apache_beam.io import concat_source from apache_beam.io import iobase from apache_beam.io import range_trackers from apache_beam.io.filesystem import CompressionTypes -from apache_beam.io.filesystems_util import get_filesystem +from apache_beam.io.filesystems import FileSystems from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils.value_provider import ValueProvider from apache_beam.utils.value_provider import StaticValueProvider @@ -86,10 +86,6 @@ class FileBasedSource(iobase.BoundedSource): if isinstance(file_pattern, basestring): file_pattern = StaticValueProvider(str, file_pattern) self._pattern = file_pattern - if file_pattern.is_accessible(): - self._file_system = get_filesystem(file_pattern.get()) - else: - self._file_system = None self._concat_source = None self._min_bundle_size = min_bundle_size @@ -118,9 +114,7 @@ class FileBasedSource(iobase.BoundedSource): pattern = self._pattern.get() single_file_sources = [] - if self._file_system is None: - self._file_system = get_filesystem(pattern) - match_result = self._file_system.match([pattern])[0] + match_result = FileSystems.match([pattern])[0] files_metadata = match_result.metadata_list # We create a reference for FileBasedSource that will be serialized along @@ -155,7 +149,7 @@ class FileBasedSource(iobase.BoundedSource): return self._concat_source def open_file(self, file_name): - return get_filesystem(file_name).open( + return FileSystems.open( file_name, 'application/octet-stream', compression_type=self._compression_type) @@ -164,11 +158,9 @@ class FileBasedSource(iobase.BoundedSource): """Validate if there are actual files in the specified glob pattern """ pattern = self._pattern.get() - if self._file_system is None: - self._file_system = get_filesystem(pattern) # Limit the responses as we only want to check if something exists - match_result = self._file_system.match([pattern], limits=[1])[0] + match_result = FileSystems.match([pattern], limits=[1])[0] if len(match_result.metadata_list) <= 0: raise IOError( 'No files found based on the file pattern %s' % pattern) @@ -183,9 +175,7 @@ class FileBasedSource(iobase.BoundedSource): @check_accessible(['_pattern']) def estimate_size(self): pattern = self._pattern.get() - if self._file_system is None: - self._file_system = get_filesystem(pattern) - match_result = self._file_system.match([pattern])[0] + match_result = FileSystems.match([pattern])[0] return sum([f.size_in_bytes for f in match_result.metadata_list]) def read(self, range_tracker): http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index f61289e..bb77bfe 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -28,7 +28,7 @@ from apache_beam.internal import util from apache_beam.io import iobase from apache_beam.io.filesystem import BeamIOError from apache_beam.io.filesystem import CompressionTypes -from apache_beam.io.filesystems_util import get_filesystem +from apache_beam.io.filesystems import FileSystems from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils.value_provider import ValueProvider from apache_beam.utils.value_provider import StaticValueProvider @@ -91,10 +91,6 @@ class FileSink(iobase.Sink): self.shard_name_format = self._template_to_format(shard_name_template) self.compression_type = compression_type self.mime_type = mime_type - if file_path_prefix.is_accessible(): - self._file_system = get_filesystem(file_path_prefix.get()) - else: - self._file_system = None def display_data(self): return {'shards': @@ -115,10 +111,7 @@ class FileSink(iobase.Sink): The returned file handle is passed to ``write_[encoded_]record`` and ``close``. """ - if self._file_system is None: - self._file_system = get_filesystem(self.file_path_prefix.get()) - return self._file_system.create(temp_path, self.mime_type, - self.compression_type) + return FileSystems.create(temp_path, self.mime_type, self.compression_type) def write_record(self, file_handle, value): """Writes a single record go the file handle returned by ``open()``. @@ -149,9 +142,7 @@ class FileSink(iobase.Sink): file_name_suffix = self.file_name_suffix.get() tmp_dir = file_path_prefix + file_name_suffix + time.strftime( '-temp-%Y-%m-%d_%H-%M-%S') - if self._file_system is None: - self._file_system = get_filesystem(file_path_prefix) - self._file_system.mkdirs(tmp_dir) + FileSystems.mkdirs(tmp_dir) return tmp_dir @check_accessible(['file_path_prefix', 'file_name_suffix']) @@ -177,7 +168,7 @@ class FileSink(iobase.Sink): source_files = [] destination_files = [] - chunk_size = self._file_system.CHUNK_SIZE + chunk_size = FileSystems.get_chunk_size(file_path_prefix) for shard_num, shard in enumerate(writer_results): final_name = ''.join([ file_path_prefix, self.shard_name_format % dict( @@ -204,10 +195,8 @@ class FileSink(iobase.Sink): """_rename_batch executes batch rename operations.""" source_files, destination_files = batch exceptions = [] - if self._file_system is None: - self._file_system = get_filesystem(file_path_prefix) try: - self._file_system.rename(source_files, destination_files) + FileSystems.rename(source_files, destination_files) return exceptions except BeamIOError as exp: if exp.exception_details is None: @@ -220,7 +209,7 @@ class FileSink(iobase.Sink): if isinstance(exception, IOError): # May have already been copied. try: - if self._file_system.exists(dest): + if FileSystems.exists(dest): should_report = False except Exception as exists_e: # pylint: disable=broad-except logging.warning('Exception when checking if file %s exists: ' @@ -250,7 +239,7 @@ class FileSink(iobase.Sink): time.time() - start_time) try: - self._file_system.delete([init_result]) + FileSystems.delete([init_result]) except IOError: # May have already been removed. pass http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 591d0b0..db38858 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -465,7 +465,8 @@ class FileSystem(object): raise NotImplementedError @abc.abstractmethod - def create(self, path, mime_type, compression_type): + def create(self, path, mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): """Returns a write channel for the given file path. Args: @@ -478,7 +479,8 @@ class FileSystem(object): raise NotImplementedError @abc.abstractmethod - def open(self, path, mime_type, compression_type): + def open(self, path, mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): """Returns a read channel for the given file path. Args: http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/filesystems.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py new file mode 100644 index 0000000..07fc684 --- /dev/null +++ b/sdks/python/apache_beam/io/filesystems.py @@ -0,0 +1,186 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""FileSystems interface class for accessing the correct filesystem""" + +from apache_beam.io.filesystem import BeamIOError +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.filesystems_util import get_filesystem + + +class FileSystems(object): + """A class that defines the functions that can be performed on a filesystem. + All methods are static and access the underlying registered filesystems. + """ + + @staticmethod + def get_filesystem(path): + """Get the correct filesystem for the specified path + """ + try: + return get_filesystem(path) + except Exception as e: + raise BeamIOError('Enable to get the Filesystem', {path: e}) + + @staticmethod + def join(basepath, *paths): + """Join two or more pathname components for the filesystem + + Args: + basepath: string path of the first component of the path + paths: path components to be added + + Returns: full path after combining all the passed components + """ + filesystem = FileSystems.get_filesystem(basepath) + return filesystem.join(basepath, *paths) + + @staticmethod + def mkdirs(path): + """Recursively create directories for the provided path. + + Args: + path: string path of the directory structure that should be created + + Raises: + IOError if leaf directory already exists. + """ + filesystem = FileSystems.get_filesystem(path) + return filesystem.mkdirs(path) + + @staticmethod + def match(patterns, limits=None): + """Find all matching paths to the patterns provided. + + Args: + patterns: list of string for the file path pattern to match against + limits: list of maximum number of responses that need to be fetched + + Returns: list of ``MatchResult`` objects. + + Raises: + ``BeamIOError`` if any of the pattern match operations fail + """ + if len(patterns) == 0: + return [] + filesystem = FileSystems.get_filesystem(patterns[0]) + return filesystem.match(patterns, limits) + + @staticmethod + def create(path, mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): + """Returns a write channel for the given file path. + + Args: + path: string path of the file object to be written to the system + mime_type: MIME type to specify the type of content in the file object + compression_type: Type of compression to be used for this object. See + ``CompressionTypes`` for possible values. + + Returns: file handle with a ``close`` function for the user to use. + """ + filesystem = FileSystems.get_filesystem(path) + return filesystem.create(path, mime_type, compression_type) + + @staticmethod + def open(path, mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): + """Returns a read channel for the given file path. + + Args: + path: string path of the file object to be written to the system + mime_type: MIME type to specify the type of content in the file object + compression_type: Type of compression to be used for this object. See + ``CompressionTypes`` for possible values. + + Returns: file handle with a ``close`` function for the user to use. + """ + filesystem = FileSystems.get_filesystem(path) + return filesystem.open(path, mime_type, compression_type) + + @staticmethod + def copy(source_file_names, destination_file_names): + """Recursively copy the file list from the source to the destination + + Args: + source_file_names: list of source file objects that needs to be copied + destination_file_names: list of destination of the new object + + Raises: + ``BeamIOError`` if any of the copy operations fail + """ + if len(source_file_names) == 0: + return + filesystem = FileSystems.get_filesystem(source_file_names[0]) + return filesystem.copy(source_file_names, destination_file_names) + + @staticmethod + def rename(source_file_names, destination_file_names): + """Rename the files at the source list to the destination list. + Source and destination lists should be of the same size. + + Args: + source_file_names: List of file paths that need to be moved + destination_file_names: List of destination_file_names for the files + + Raises: + ``BeamIOError`` if any of the rename operations fail + """ + if len(source_file_names) == 0: + return + filesystem = FileSystems.get_filesystem(source_file_names[0]) + return filesystem.rename(source_file_names, destination_file_names) + + @staticmethod + def exists(path): + """Check if the provided path exists on the FileSystem. + + Args: + path: string path that needs to be checked. + + Returns: boolean flag indicating if path exists + """ + filesystem = FileSystems.get_filesystem(path) + return filesystem.exists(path) + + @staticmethod + def delete(paths): + """Deletes files or directories at the provided paths. + Directories will be deleted recursively. + + Args: + paths: list of paths that give the file objects to be deleted + + Raises: + ``BeamIOError`` if any of the delete operations fail + """ + if len(paths) == 0: + return + filesystem = FileSystems.get_filesystem(paths[0]) + return filesystem.delete(paths) + + @staticmethod + def get_chunk_size(path): + """Get the correct chunk size for the FileSystem. + + Args: + path: string path that needs to be checked. + + Returns: integer size for parallelization in the FS operations. + """ + filesystem = FileSystems.get_filesystem(path) + return filesystem.CHUNK_SIZE http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/filesystems_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystems_test.py b/sdks/python/apache_beam/io/filesystems_test.py new file mode 100644 index 0000000..9165586 --- /dev/null +++ b/sdks/python/apache_beam/io/filesystems_test.py @@ -0,0 +1,224 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for LocalFileSystem.""" + +import unittest + +import filecmp +import os +import shutil +import tempfile +import mock + +from apache_beam.io import localfilesystem +from apache_beam.io.filesystem import BeamIOError +from apache_beam.io.filesystems import FileSystems + + +def _gen_fake_join(separator): + """Returns a callable that joins paths with the given separator.""" + + def _join(first_path, *paths): + return separator.join((first_path.rstrip(separator),) + paths) + + return _join + + +class FileSystemsTest(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + @mock.patch('apache_beam.io.localfilesystem.os') + def test_unix_path_join(self, *unused_mocks): + # Test joining of Unix paths. + localfilesystem.os.path.join.side_effect = _gen_fake_join('/') + self.assertEqual('/tmp/path/to/file', + FileSystems.join('/tmp/path', 'to', 'file')) + self.assertEqual('/tmp/path/to/file', + FileSystems.join('/tmp/path', 'to/file')) + self.assertEqual('/tmp/path/to/file', + FileSystems.join('/', 'tmp/path', 'to/file')) + self.assertEqual('/tmp/path/to/file', + FileSystems.join('/tmp/', 'path', 'to/file')) + + @mock.patch('apache_beam.io.localfilesystem.os') + def test_windows_path_join(self, *unused_mocks): + # Test joining of Windows paths. + localfilesystem.os.path.join.side_effect = _gen_fake_join('\\') + self.assertEqual(r'C:\tmp\path\to\file', + FileSystems.join(r'C:\tmp\path', 'to', 'file')) + self.assertEqual(r'C:\tmp\path\to\file', + FileSystems.join(r'C:\tmp\path', r'to\file')) + self.assertEqual(r'C:\tmp\path\to\file', + FileSystems.join(r'C:\tmp\path\\', 'to', 'file')) + + def test_mkdirs(self): + path = os.path.join(self.tmpdir, 't1/t2') + FileSystems.mkdirs(path) + self.assertTrue(os.path.isdir(path)) + + def test_mkdirs_failed(self): + path = os.path.join(self.tmpdir, 't1/t2') + FileSystems.mkdirs(path) + + # Check IOError if existing directory is created + with self.assertRaises(IOError): + FileSystems.mkdirs(path) + + with self.assertRaises(IOError): + FileSystems.mkdirs(os.path.join(self.tmpdir, 't1')) + + def test_match_file(self): + path = os.path.join(self.tmpdir, 'f1') + open(path, 'a').close() + + # Match files in the temp directory + result = FileSystems.match([path])[0] + files = [f.path for f in result.metadata_list] + self.assertEqual(files, [path]) + + def test_match_file_empty(self): + path = os.path.join(self.tmpdir, 'f2') # Does not exist + + # Match files in the temp directory + result = FileSystems.match([path])[0] + files = [f.path for f in result.metadata_list] + self.assertEqual(files, []) + + def test_match_file_exception(self): + # Match files with None so that it throws an exception + with self.assertRaises(BeamIOError) as error: + FileSystems.match([None]) + self.assertTrue( + error.exception.message.startswith('Enable to get the Filesystem')) + self.assertEqual(error.exception.exception_details.keys(), [None]) + + def test_match_directory(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + open(path1, 'a').close() + open(path2, 'a').close() + + # Match both the files in the directory + path = os.path.join(self.tmpdir, '*') + result = FileSystems.match([path])[0] + files = [f.path for f in result.metadata_list] + self.assertEqual(files, [path1, path2]) + + def test_match_directory(self): + result = FileSystems.match([self.tmpdir])[0] + files = [f.path for f in result.metadata_list] + self.assertEqual(files, [self.tmpdir]) + + def test_copy(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + with open(path1, 'a') as f: + f.write('Hello') + + FileSystems.copy([path1], [path2]) + self.assertTrue(filecmp.cmp(path1, path2)) + + def test_copy_error(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + with self.assertRaises(BeamIOError) as error: + FileSystems.copy([path1], [path2]) + self.assertTrue( + error.exception.message.startswith('Copy operation failed')) + self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + + def test_copy_directory(self): + path_t1 = os.path.join(self.tmpdir, 't1') + path_t2 = os.path.join(self.tmpdir, 't2') + FileSystems.mkdirs(path_t1) + FileSystems.mkdirs(path_t2) + + path1 = os.path.join(path_t1, 'f1') + path2 = os.path.join(path_t2, 'f1') + with open(path1, 'a') as f: + f.write('Hello') + + FileSystems.copy([path_t1], [path_t2]) + self.assertTrue(filecmp.cmp(path1, path2)) + + def test_rename(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + with open(path1, 'a') as f: + f.write('Hello') + + FileSystems.rename([path1], [path2]) + self.assertTrue(FileSystems.exists(path2)) + self.assertFalse(FileSystems.exists(path1)) + + def test_rename_error(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + with self.assertRaises(BeamIOError) as error: + FileSystems.rename([path1], [path2]) + self.assertTrue( + error.exception.message.startswith('Rename operation failed')) + self.assertEqual(error.exception.exception_details.keys(), [(path1, path2)]) + + def test_rename_directory(self): + path_t1 = os.path.join(self.tmpdir, 't1') + path_t2 = os.path.join(self.tmpdir, 't2') + FileSystems.mkdirs(path_t1) + + path1 = os.path.join(path_t1, 'f1') + path2 = os.path.join(path_t2, 'f1') + with open(path1, 'a') as f: + f.write('Hello') + + FileSystems.rename([path_t1], [path_t2]) + self.assertTrue(FileSystems.exists(path_t2)) + self.assertFalse(FileSystems.exists(path_t1)) + self.assertTrue(FileSystems.exists(path2)) + self.assertFalse(FileSystems.exists(path1)) + + def test_exists(self): + path1 = os.path.join(self.tmpdir, 'f1') + path2 = os.path.join(self.tmpdir, 'f2') + with open(path1, 'a') as f: + f.write('Hello') + self.assertTrue(FileSystems.exists(path1)) + self.assertFalse(FileSystems.exists(path2)) + + def test_delete(self): + path1 = os.path.join(self.tmpdir, 'f1') + + with open(path1, 'a') as f: + f.write('Hello') + + self.assertTrue(FileSystems.exists(path1)) + FileSystems.delete([path1]) + self.assertFalse(FileSystems.exists(path1)) + + def test_delete_error(self): + path1 = os.path.join(self.tmpdir, 'f1') + with self.assertRaises(BeamIOError) as error: + FileSystems.delete([path1]) + self.assertTrue( + error.exception.message.startswith('Delete operation failed')) + self.assertEqual(error.exception.exception_details.keys(), [path1]) http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/io/localfilesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py index 3fe308d..df6eb61 100644 --- a/sdks/python/apache_beam/io/localfilesystem_test.py +++ b/sdks/python/apache_beam/io/localfilesystem_test.py @@ -26,15 +26,15 @@ import shutil import tempfile import mock -from apache_beam.io.filesystem import BeamIOError from apache_beam.io import localfilesystem +from apache_beam.io.filesystem import BeamIOError def _gen_fake_join(separator): """Returns a callable that joins paths with the given separator.""" def _join(first_path, *paths): - return separator.join((first_path,) + paths) + return separator.join((first_path.rstrip(separator),) + paths) return _join http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index d95b33f..0270cbe 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -32,7 +32,7 @@ from apitools.base.py import exceptions from apache_beam.internal.gcp.auth import get_service_credentials from apache_beam.internal.gcp.json_value import to_json_value -from apache_beam.io.filesystems_util import get_filesystem +from apache_beam.io.filesystems import FileSystems from apache_beam.io.gcp.internal.clients import storage from apache_beam.runners.dataflow.internal import dependency from apache_beam.runners.dataflow.internal.clients import dataflow @@ -336,10 +336,9 @@ class Job(object): # for GCS staging locations where the potential for such clashes is high. if self.google_cloud_options.staging_location.startswith('gs://'): path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time()) - filesystem = get_filesystem(self.google_cloud_options.staging_location) - self.google_cloud_options.staging_location = filesystem.join( + self.google_cloud_options.staging_location = FileSystems.join( self.google_cloud_options.staging_location, path_suffix) - self.google_cloud_options.temp_location = filesystem.join( + self.google_cloud_options.temp_location = FileSystems.join( self.google_cloud_options.temp_location, path_suffix) self.proto = dataflow.Job(name=self.google_cloud_options.job_name) http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/runners/dataflow/internal/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index bb490f3..f64f9b2 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -63,7 +63,7 @@ import tempfile from apache_beam import version as beam_version from apache_beam.internal import pickler -from apache_beam.io.filesystems_util import get_filesystem +from apache_beam.io.filesystems import FileSystems from apache_beam.runners.dataflow.internal import names from apache_beam.utils import processes from apache_beam.utils.pipeline_options import GoogleCloudOptions @@ -157,7 +157,6 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir, name patterns. """ resources = [] - staging_filesystem = get_filesystem(staging_location) staging_temp_dir = None local_packages = [] for package in extra_packages: @@ -190,14 +189,13 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir, local_packages.append(package) if staging_temp_dir: - temp_fs = get_filesystem(staging_temp_dir) local_packages.extend( - [temp_fs.join(staging_temp_dir, f) for f in os.listdir( + [FileSystems.join(staging_temp_dir, f) for f in os.listdir( staging_temp_dir)]) for package in local_packages: basename = os.path.basename(package) - staged_path = staging_filesystem.join(staging_location, basename) + staged_path = FileSystems.join(staging_location, basename) file_copy(package, staged_path) resources.append(basename) # Create a file containing the list of extra packages and stage it. @@ -210,7 +208,7 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir, with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f: for package in local_packages: f.write('%s\n' % os.path.basename(package)) - staged_path = staging_filesystem.join(staging_location, EXTRA_PACKAGES_FILE) + staged_path = FileSystems.join(staging_location, EXTRA_PACKAGES_FILE) # Note that the caller of this function is responsible for deleting the # temporary folder where all temp files are created, including this one. file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path) @@ -285,16 +283,14 @@ def stage_job_resources( raise RuntimeError( 'The --temp_location option must be specified.') - filesystem = get_filesystem(google_cloud_options.staging_location) - # Stage a requirements file if present. if setup_options.requirements_file is not None: if not os.path.isfile(setup_options.requirements_file): raise RuntimeError('The file %s cannot be found. It was specified in the ' '--requirements_file command line option.' % setup_options.requirements_file) - staged_path = filesystem.join(google_cloud_options.staging_location, - REQUIREMENTS_FILE) + staged_path = FileSystems.join(google_cloud_options.staging_location, + REQUIREMENTS_FILE) file_copy(setup_options.requirements_file, staged_path) resources.append(REQUIREMENTS_FILE) requirements_cache_path = ( @@ -308,8 +304,8 @@ def stage_job_resources( populate_requirements_cache( setup_options.requirements_file, requirements_cache_path) for pkg in glob.glob(os.path.join(requirements_cache_path, '*')): - file_copy(pkg, filesystem.join(google_cloud_options.staging_location, - os.path.basename(pkg))) + file_copy(pkg, FileSystems.join(google_cloud_options.staging_location, + os.path.basename(pkg))) resources.append(os.path.basename(pkg)) # Handle a setup file if present. @@ -327,8 +323,8 @@ def stage_job_resources( 'setup.py instead of %s' % setup_options.setup_file) tarball_file = _build_setup_package(setup_options.setup_file, temp_dir, build_setup_args) - staged_path = filesystem.join(google_cloud_options.staging_location, - WORKFLOW_TARBALL_FILE) + staged_path = FileSystems.join(google_cloud_options.staging_location, + WORKFLOW_TARBALL_FILE) file_copy(tarball_file, staged_path) resources.append(WORKFLOW_TARBALL_FILE) @@ -347,8 +343,8 @@ def stage_job_resources( pickled_session_file = os.path.join(temp_dir, names.PICKLED_MAIN_SESSION_FILE) pickler.dump_session(pickled_session_file) - staged_path = filesystem.join(google_cloud_options.staging_location, - names.PICKLED_MAIN_SESSION_FILE) + staged_path = FileSystems.join(google_cloud_options.staging_location, + names.PICKLED_MAIN_SESSION_FILE) file_copy(pickled_session_file, staged_path) resources.append(names.PICKLED_MAIN_SESSION_FILE) @@ -362,8 +358,8 @@ def stage_job_resources( else: stage_tarball_from_remote_location = False - staged_path = filesystem.join(google_cloud_options.staging_location, - names.DATAFLOW_SDK_TARBALL_FILE) + staged_path = FileSystems.join(google_cloud_options.staging_location, + names.DATAFLOW_SDK_TARBALL_FILE) if stage_tarball_from_remote_location: # If --sdk_location is not specified then the appropriate package # will be obtained from PyPI (https://pypi.python.org) based on the http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py index 24f65d0..1ff087b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py @@ -23,7 +23,7 @@ import shutil import tempfile import unittest -from apache_beam.io.filesystems_util import get_filesystem +from apache_beam.io.filesystems import FileSystems from apache_beam.runners.dataflow.internal import dependency from apache_beam.runners.dataflow.internal import names from apache_beam.utils.pipeline_options import GoogleCloudOptions @@ -241,9 +241,8 @@ class SetupTest(unittest.TestCase): def file_copy(from_path, to_path): if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE): self.assertEqual(expected_from_path, from_path) - filesystem = get_filesystem(expected_to_dir) - self.assertEqual(filesystem.join(expected_to_dir, - names.DATAFLOW_SDK_TARBALL_FILE), + self.assertEqual(FileSystems.join(expected_to_dir, + names.DATAFLOW_SDK_TARBALL_FILE), to_path) if from_path.startswith('gs://') or to_path.startswith('gs://'): logging.info('Faking file_copy(%s, %s)', from_path, to_path) http://git-wip-us.apache.org/repos/asf/beam/blob/ad6dcf4d/sdks/python/apache_beam/tests/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py index 51302b0..df05054 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py @@ -27,7 +27,7 @@ import time from hamcrest.core.base_matcher import BaseMatcher -from apache_beam.io.filesystems_util import get_filesystem +from apache_beam.io.filesystems import FileSystems from apache_beam.runners.runner import PipelineState from apache_beam.tests import test_utils as utils from apache_beam.utils import retry @@ -99,7 +99,6 @@ class FileChecksumMatcher(BaseMatcher): self.sleep_secs = None self.file_path = file_path - self.file_system = get_filesystem(self.file_path) self.expected_checksum = expected_checksum @retry.with_exponential_backoff( @@ -108,7 +107,7 @@ class FileChecksumMatcher(BaseMatcher): def _read_with_retry(self): """Read path with retry if I/O failed""" read_lines = [] - match_result = self.file_system.match([self.file_path])[0] + match_result = FileSystems.match([self.file_path])[0] matched_path = [f.path for f in match_result.metadata_list] if not matched_path: raise IOError('No such file or directory: %s' % self.file_path) @@ -116,7 +115,7 @@ class FileChecksumMatcher(BaseMatcher): logging.info('Find %d files in %s: \n%s', len(matched_path), self.file_path, '\n'.join(matched_path)) for path in matched_path: - with self.file_system.open(path, 'r') as f: + with FileSystems.open(path, 'r') as f: for line in f: read_lines.append(line) return read_lines
