[BEAM-1441] Remove deprecated ChannelFactory
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97c66784 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97c66784 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97c66784 Branch: refs/heads/gearpump-runner Commit: 97c667846b566c312ceaadc66fb14fde1dfa7ebe Parents: 8319369 Author: Sourabh Bajaj <[email protected]> Authored: Fri Apr 14 14:45:16 2017 -0700 Committer: [email protected] <[email protected]> Committed: Wed Apr 19 09:56:28 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 90 ------------------------------- 1 file changed, 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/97c66784/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 8ee5198..f61289e 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -27,7 +27,6 @@ import time from apache_beam.internal import util from apache_beam.io import iobase from apache_beam.io.filesystem import BeamIOError -from apache_beam.io.filesystem import CompressedFile as _CompressedFile from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems_util import get_filesystem from apache_beam.transforms.display import DisplayDataItem @@ -38,95 +37,6 @@ from apache_beam.utils.value_provider import check_accessible DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN' -# TODO(sourabhbajaj): Remove this after BFS API is used everywhere -class ChannelFactory(object): - @staticmethod - def mkdir(path): - bfs = get_filesystem(path) - return bfs.mkdirs(path) - - @staticmethod - def open(path, - mode, - mime_type='application/octet-stream', - compression_type=CompressionTypes.AUTO): - bfs = get_filesystem(path) - if mode == 'rb': - return bfs.open(path, mime_type, compression_type) - elif mode == 'wb': - return bfs.create(path, mime_type, compression_type) - - @staticmethod - def is_compressed(fileobj): - return isinstance(fileobj, _CompressedFile) - - @staticmethod - def rename(src, dest): - bfs = get_filesystem(src) - return bfs.rename([src], [dest]) - - @staticmethod - def rename_batch(src_dest_pairs): - sources = [s for s, _ in src_dest_pairs] - destinations = [d for _, d in src_dest_pairs] - if not sources: - return [] - bfs = get_filesystem(sources[0]) - try: - bfs.rename(sources, destinations) - return [] - except BeamIOError as exp: - return [(s, d, e) for (s, d), e in exp.exception_details.iteritems()] - - @staticmethod - def copytree(src, dest): - bfs = get_filesystem(src) - return bfs.copy([src], [dest]) - - @staticmethod - def exists(path): - bfs = get_filesystem(path) - return bfs.exists(path) - - @staticmethod - def rmdir(path): - bfs = get_filesystem(path) - return bfs.delete([path]) - - @staticmethod - def rm(path): - bfs = get_filesystem(path) - return bfs.delete([path]) - - @staticmethod - def glob(path, limit=None): - bfs = get_filesystem(path) - match_result = bfs.match([path], [limit])[0] - return [f.path for f in match_result.metadata_list] - - @staticmethod - def size_in_bytes(path): - bfs = get_filesystem(path) - match_result = bfs.match([path])[0] - return [f.size_in_bytes for f in match_result.metadata_list][0] - - @staticmethod - def size_of_files_in_glob(path, file_names=None): - bfs = get_filesystem(path) - match_result = bfs.match([path])[0] - part_files = {f.path:f.size_in_bytes for f in match_result.metadata_list} - - if file_names is not None: - specific_files = {} - match_results = bfs.match(file_names) - for match_result in match_results: - for metadata in match_result.metadata_list: - specific_files[metadata.path] = metadata.size_in_bytes - - part_files.update(specific_files) - return part_files - - class FileSink(iobase.Sink): """A sink to a GCS or local files.
