Repository: beam Updated Branches: refs/heads/master f138b3569 -> c489686e4
Fix bugs in fileio in the Temp IOFactory Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a679ab11 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a679ab11 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a679ab11 Branch: refs/heads/master Commit: a679ab111a907193067a8698708faf570f4d8c9e Parents: f138b35 Author: Sourabh Bajaj <[email protected]> Authored: Tue Mar 28 14:15:20 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Tue Mar 28 15:52:42 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/__init__.py | 1 + sdks/python/apache_beam/io/fileio.py | 27 +++++++++++++++------------ 2 files changed, 16 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a679ab11/sdks/python/apache_beam/io/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py index 4b434be..881ce68 100644 --- a/sdks/python/apache_beam/io/__init__.py +++ b/sdks/python/apache_beam/io/__init__.py @@ -33,6 +33,7 @@ from apache_beam.io.range_trackers import * try: from apache_beam.io.gcp.bigquery import * from apache_beam.io.gcp.pubsub import * + from apache_beam.io.gcp import gcsio except ImportError: pass # pylint: enable=wrong-import-order, wrong-import-position http://git-wip-us.apache.org/repos/asf/beam/blob/a679ab11/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 0759ce4..f33942a 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -40,7 +40,7 @@ class ChannelFactory(object): @staticmethod def mkdir(path): bfs = get_filesystem(path) - bfs.mkdirs(path) + return bfs.mkdirs(path) @staticmethod def open(path, @@ -59,14 +59,16 @@ class ChannelFactory(object): @staticmethod def rename(src, dest): - bfs = get_filesystem(path) - bfs.rename([src], [dest]) + bfs = get_filesystem(src) + return bfs.rename([src], [dest]) @staticmethod def rename_batch(src_dest_pairs): sources = [s for s, _ in src_dest_pairs] destinations = [d for _, d in src_dest_pairs] - bfs = get_filesystem() + if len(sources) == 0: + return [] + bfs = get_filesystem(sources[0]) try: bfs.rename(sources, destinations) return [] @@ -75,23 +77,23 @@ class ChannelFactory(object): @staticmethod def copytree(src, dest): - bfs = get_filesystem() - bfs.copy([src], [dest]) + bfs = get_filesystem(src) + return bfs.copy([src], [dest]) @staticmethod def exists(path): bfs = get_filesystem(path) - bfs.exists(path) + return bfs.exists(path) @staticmethod def rmdir(path): bfs = get_filesystem(path) - bfs.delete([path]) + return bfs.delete([path]) @staticmethod def rm(path): bfs = get_filesystem(path) - bfs.delete([path]) + return bfs.delete([path]) @staticmethod def glob(path, limit=None): @@ -102,13 +104,13 @@ class ChannelFactory(object): @staticmethod def size_in_bytes(path): bfs = get_filesystem(path) - match_result = bfs.match([path], [limit])[0] + match_result = bfs.match([path])[0] return [f.size_in_bytes for f in match_result.metadata_list][0] @staticmethod def size_of_files_in_glob(path, file_names=None): bfs = get_filesystem(path) - match_result = bfs.match([path], [limit])[0] + match_result = bfs.match([path])[0] part_files = {f.path:f.size_in_bytes for f in match_result.metadata_list} if file_names is not None: @@ -118,7 +120,8 @@ class ChannelFactory(object): for metadata in match_result.metadata_list: specific_files[metadata.path] = metadata.size_in_bytes - return part_files.update(specific_files) + part_files.update(specific_files) + return part_files class FileSink(iobase.Sink):
