Repository: beam Updated Branches: refs/heads/master 652fcb7e9 -> 36a6cd69a
Rename filesink to filebasedsink Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f177820 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f177820 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f177820 Branch: refs/heads/master Commit: 2f177820c4e72fe92dc47d595847ddecdae7afcd Parents: 652fcb7 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Thu May 11 17:23:40 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Thu May 11 18:04:18 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/__init__.py | 2 +- sdks/python/apache_beam/io/avroio.py | 4 +- sdks/python/apache_beam/io/filebasedsink.py | 299 ++++++++++++++++++ .../python/apache_beam/io/filebasedsink_test.py | 303 ++++++++++++++++++ sdks/python/apache_beam/io/fileio.py | 304 ------------------- sdks/python/apache_beam/io/fileio_test.py | 303 ------------------ sdks/python/apache_beam/io/gcp/gcsio.py | 6 +- sdks/python/apache_beam/io/iobase.py | 12 +- sdks/python/apache_beam/io/textio.py | 4 +- sdks/python/apache_beam/io/tfrecordio.py | 6 +- .../apache_beam/testing/pipeline_verifiers.py | 4 +- 11 files changed, 623 insertions(+), 624 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/io/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py index 881ce68..6ea0efd 100644 --- a/sdks/python/apache_beam/io/__init__.py +++ b/sdks/python/apache_beam/io/__init__.py @@ -19,7 +19,7 @@ # pylint: disable=wildcard-import from apache_beam.io.avroio import * -from apache_beam.io.fileio import * +from apache_beam.io.filebasedsink import * from apache_beam.io.iobase import Read from apache_beam.io.iobase import Sink from apache_beam.io.iobase import Write http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/io/avroio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 1c08c68..e02e1f7 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -27,7 +27,7 @@ from avro import schema import apache_beam as beam from apache_beam.io import filebasedsource -from apache_beam.io import fileio +from apache_beam.io import filebasedsink from apache_beam.io import iobase from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.iobase import Read @@ -335,7 +335,7 @@ class WriteToAvro(beam.transforms.PTransform): return {'sink_dd': self._sink} -class _AvroSink(fileio.FileSink): +class _AvroSink(filebasedsink.FileBasedSink): """A sink to avro files.""" def __init__(self, http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/io/filebasedsink.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py new file mode 100644 index 0000000..76c09fc --- /dev/null +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -0,0 +1,299 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""File-based sink.""" + +from __future__ import absolute_import + +import logging +import os +import re +import time +import uuid + +from apache_beam.internal import util +from apache_beam.io import iobase +from apache_beam.io.filesystem import BeamIOError +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.filesystems import FileSystems +from apache_beam.transforms.display import DisplayDataItem +from apache_beam.options.value_provider import ValueProvider +from apache_beam.options.value_provider import StaticValueProvider +from apache_beam.options.value_provider import check_accessible + +DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN' + +__all__ = ['FileBasedSink'] + + +class FileBasedSink(iobase.Sink): + """A sink to a GCS or local files. + + To implement a file-based sink, extend this class and override + either ``write_record()`` or ``write_encoded_record()``. + + If needed, also overwrite ``open()`` and/or ``close()`` to customize the + file handling or write headers and footers. + + The output of this write is a PCollection of all written shards. + """ + + # Max number of threads to be used for renaming. + _MAX_RENAME_THREADS = 64 + + def __init__(self, + file_path_prefix, + coder, + file_name_suffix='', + num_shards=0, + shard_name_template=None, + mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): + """ + Raises: + TypeError: if file path parameters are not a string or ValueProvider, + or if compression_type is not member of CompressionTypes. + ValueError: if shard_name_template is not of expected format. + """ + if not isinstance(file_path_prefix, (basestring, ValueProvider)): + raise TypeError('file_path_prefix must be a string or ValueProvider;' + 'got %r instead' % file_path_prefix) + if not isinstance(file_name_suffix, (basestring, ValueProvider)): + raise TypeError('file_name_suffix must be a string or ValueProvider;' + 'got %r instead' % file_name_suffix) + + if not CompressionTypes.is_valid_compression_type(compression_type): + raise TypeError('compression_type must be CompressionType object but ' + 'was %s' % type(compression_type)) + if shard_name_template is None: + shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE + elif shard_name_template == '': + num_shards = 1 + if isinstance(file_path_prefix, basestring): + file_path_prefix = StaticValueProvider(str, file_path_prefix) + if isinstance(file_name_suffix, basestring): + file_name_suffix = StaticValueProvider(str, file_name_suffix) + self.file_path_prefix = file_path_prefix + self.file_name_suffix = file_name_suffix + self.num_shards = num_shards + self.coder = coder + self.shard_name_format = self._template_to_format(shard_name_template) + self.compression_type = compression_type + self.mime_type = mime_type + + def display_data(self): + return {'shards': + DisplayDataItem(self.num_shards, + label='Number of Shards').drop_if_default(0), + 'compression': + DisplayDataItem(str(self.compression_type)), + 'file_pattern': + DisplayDataItem('{}{}{}'.format(self.file_path_prefix, + self.shard_name_format, + self.file_name_suffix), + label='File Pattern')} + + @check_accessible(['file_path_prefix']) + def open(self, temp_path): + """Opens ``temp_path``, returning an opaque file handle object. + + The returned file handle is passed to ``write_[encoded_]record`` and + ``close``. + """ + return FileSystems.create(temp_path, self.mime_type, self.compression_type) + + def write_record(self, file_handle, value): + """Writes a single record go the file handle returned by ``open()``. + + By default, calls ``write_encoded_record`` after encoding the record with + this sink's Coder. + """ + self.write_encoded_record(file_handle, self.coder.encode(value)) + + def write_encoded_record(self, file_handle, encoded_value): + """Writes a single encoded record to the file handle returned by ``open()``. + """ + raise NotImplementedError + + def close(self, file_handle): + """Finalize and close the file handle returned from ``open()``. + + Called after all records are written. + + By default, calls ``file_handle.close()`` iff it is not None. + """ + if file_handle is not None: + file_handle.close() + + @check_accessible(['file_path_prefix', 'file_name_suffix']) + def initialize_write(self): + file_path_prefix = self.file_path_prefix.get() + + tmp_dir = self._create_temp_dir(file_path_prefix) + FileSystems.mkdirs(tmp_dir) + return tmp_dir + + def _create_temp_dir(self, file_path_prefix): + base_path, last_component = FileSystems.split(file_path_prefix) + if not last_component: + # Trying to re-split the base_path to check if it's a root. + new_base_path, _ = FileSystems.split(base_path) + if base_path == new_base_path: + raise ValueError('Cannot create a temporary directory for root path ' + 'prefix %s. Please specify a file path prefix with ' + 'at least two components.', + file_path_prefix) + path_components = [base_path, + 'beam-temp-' + last_component + '-' + uuid.uuid1().hex] + return FileSystems.join(*path_components) + + @check_accessible(['file_path_prefix', 'file_name_suffix']) + def open_writer(self, init_result, uid): + # A proper suffix is needed for AUTO compression detection. + # We also ensure there will be no collisions with uid and a + # (possibly unsharded) file_path_prefix and a (possibly empty) + # file_name_suffix. + file_path_prefix = self.file_path_prefix.get() + file_name_suffix = self.file_name_suffix.get() + suffix = ( + '.' + os.path.basename(file_path_prefix) + file_name_suffix) + return FileBasedSinkWriter(self, os.path.join(init_result, uid) + suffix) + + @check_accessible(['file_path_prefix', 'file_name_suffix']) + def finalize_write(self, init_result, writer_results): + file_path_prefix = self.file_path_prefix.get() + file_name_suffix = self.file_name_suffix.get() + writer_results = sorted(writer_results) + num_shards = len(writer_results) + min_threads = min(num_shards, FileBasedSink._MAX_RENAME_THREADS) + num_threads = max(1, min_threads) + + source_files = [] + destination_files = [] + chunk_size = FileSystems.get_chunk_size(file_path_prefix) + for shard_num, shard in enumerate(writer_results): + final_name = ''.join([ + file_path_prefix, self.shard_name_format % dict( + shard_num=shard_num, num_shards=num_shards), file_name_suffix + ]) + source_files.append(shard) + destination_files.append(final_name) + + source_file_batch = [source_files[i:i + chunk_size] + for i in xrange(0, len(source_files), + chunk_size)] + destination_file_batch = [destination_files[i:i + chunk_size] + for i in xrange(0, len(destination_files), + chunk_size)] + + logging.info( + 'Starting finalize_write threads with num_shards: %d, ' + 'batches: %d, num_threads: %d', + num_shards, len(source_file_batch), num_threads) + start_time = time.time() + + # Use a thread pool for renaming operations. + def _rename_batch(batch): + """_rename_batch executes batch rename operations.""" + source_files, destination_files = batch + exceptions = [] + try: + FileSystems.rename(source_files, destination_files) + return exceptions + except BeamIOError as exp: + if exp.exception_details is None: + raise + for (src, dest), exception in exp.exception_details.iteritems(): + if exception: + logging.warning('Rename not successful: %s -> %s, %s', src, dest, + exception) + should_report = True + if isinstance(exception, IOError): + # May have already been copied. + try: + if FileSystems.exists(dest): + should_report = False + except Exception as exists_e: # pylint: disable=broad-except + logging.warning('Exception when checking if file %s exists: ' + '%s', dest, exists_e) + if should_report: + logging.warning(('Exception in _rename_batch. src: %s, ' + 'dest: %s, err: %s'), src, dest, exception) + exceptions.append(exception) + else: + logging.debug('Rename successful: %s -> %s', src, dest) + return exceptions + + exception_batches = util.run_using_threadpool( + _rename_batch, zip(source_file_batch, destination_file_batch), + num_threads) + + all_exceptions = [e for exception_batch in exception_batches + for e in exception_batch] + if all_exceptions: + raise Exception('Encountered exceptions in finalize_write: %s', + all_exceptions) + + for final_name in destination_files: + yield final_name + + logging.info('Renamed %d shards in %.2f seconds.', num_shards, + time.time() - start_time) + + try: + FileSystems.delete([init_result]) + except IOError: + # May have already been removed. + pass + + @staticmethod + def _template_to_format(shard_name_template): + if not shard_name_template: + return '' + m = re.search('S+', shard_name_template) + if m is None: + raise ValueError("Shard number pattern S+ not found in template '%s'" % + shard_name_template) + shard_name_format = shard_name_template.replace( + m.group(0), '%%(shard_num)0%dd' % len(m.group(0))) + m = re.search('N+', shard_name_format) + if m: + shard_name_format = shard_name_format.replace( + m.group(0), '%%(num_shards)0%dd' % len(m.group(0))) + return shard_name_format + + def __eq__(self, other): + # TODO: Clean up workitem_test which uses this. + # pylint: disable=unidiomatic-typecheck + return type(self) == type(other) and self.__dict__ == other.__dict__ + + +class FileBasedSinkWriter(iobase.Writer): + """The writer for FileBasedSink. + """ + + def __init__(self, sink, temp_shard_path): + self.sink = sink + self.temp_shard_path = temp_shard_path + self.temp_handle = self.sink.open(temp_shard_path) + + def write(self, value): + self.sink.write_record(self.temp_handle, value) + + def close(self): + self.sink.close(self.temp_handle) + return self.temp_shard_path http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/io/filebasedsink_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py new file mode 100644 index 0000000..1f6aeee --- /dev/null +++ b/sdks/python/apache_beam/io/filebasedsink_test.py @@ -0,0 +1,303 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for file sinks.""" + +import glob +import logging +import os +import shutil +import tempfile +import unittest + +import hamcrest as hc +import mock + +import apache_beam as beam +from apache_beam.coders import coders +from apache_beam.io import filebasedsink +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher + +from apache_beam.options.value_provider import StaticValueProvider + + +# TODO: Refactor code so all io tests are using same library +# TestCaseWithTempDirCleanup class. +class _TestCaseWithTempDirCleanUp(unittest.TestCase): + """Base class for TestCases that deals with TempDir clean-up. + + Inherited test cases will call self._new_tempdir() to start a temporary dir + which will be deleted at the end of the tests (when tearDown() is called). + """ + + def setUp(self): + self._tempdirs = [] + + def tearDown(self): + for path in self._tempdirs: + if os.path.exists(path): + shutil.rmtree(path) + self._tempdirs = [] + + def _new_tempdir(self): + result = tempfile.mkdtemp() + self._tempdirs.append(result) + return result + + def _create_temp_file(self, name='', suffix=''): + if not name: + name = tempfile.template + file_name = tempfile.NamedTemporaryFile( + delete=False, prefix=name, + dir=self._new_tempdir(), suffix=suffix).name + return file_name + + +class MyFileBasedSink(filebasedsink.FileBasedSink): + + def open(self, temp_path): + # TODO: Fix main session pickling. + # file_handle = super(MyFileBasedSink, self).open(temp_path) + file_handle = filebasedsink.FileBasedSink.open(self, temp_path) + file_handle.write('[start]') + return file_handle + + def write_encoded_record(self, file_handle, encoded_value): + file_handle.write('[') + file_handle.write(encoded_value) + file_handle.write(']') + + def close(self, file_handle): + file_handle.write('[end]') + # TODO: Fix main session pickling. + # file_handle = super(MyFileBasedSink, self).close(file_handle) + file_handle = filebasedsink.FileBasedSink.close(self, file_handle) + + +class TestFileBasedSink(_TestCaseWithTempDirCleanUp): + + def test_file_sink_writing(self): + temp_path = os.path.join(self._new_tempdir(), 'FileBasedSink') + sink = MyFileBasedSink( + temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) + + # Manually invoke the generic Sink API. + init_token = sink.initialize_write() + + writer1 = sink.open_writer(init_token, '1') + writer1.write('a') + writer1.write('b') + res1 = writer1.close() + + writer2 = sink.open_writer(init_token, '2') + writer2.write('x') + writer2.write('y') + writer2.write('z') + res2 = writer2.close() + + _ = list(sink.finalize_write(init_token, [res1, res2])) + # Retry the finalize operation (as if the first attempt was lost). + res = list(sink.finalize_write(init_token, [res1, res2])) + + # Check the results. + shard1 = temp_path + '-00000-of-00002.output' + shard2 = temp_path + '-00001-of-00002.output' + self.assertEqual(res, [shard1, shard2]) + self.assertEqual(open(shard1).read(), '[start][a][b][end]') + self.assertEqual(open(shard2).read(), '[start][x][y][z][end]') + + # Check that any temp files are deleted. + self.assertItemsEqual([shard1, shard2], glob.glob(temp_path + '*')) + + def test_file_sink_display_data(self): + temp_path = os.path.join(self._new_tempdir(), 'display') + sink = MyFileBasedSink( + temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) + dd = DisplayData.create_from(sink) + expected_items = [ + DisplayDataItemMatcher( + 'compression', 'auto'), + DisplayDataItemMatcher( + 'file_pattern', + '{}{}'.format( + temp_path, + '-%(shard_num)05d-of-%(num_shards)05d.output'))] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + def test_empty_write(self): + temp_path = tempfile.NamedTemporaryFile().name + sink = MyFileBasedSink( + temp_path, file_name_suffix='.output', coder=coders.ToStringCoder() + ) + p = TestPipeline() + p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned + p.run() + self.assertEqual( + open(temp_path + '-00000-of-00001.output').read(), '[start][end]') + + def test_static_value_provider_empty_write(self): + temp_path = StaticValueProvider(value_type=str, + value=tempfile.NamedTemporaryFile().name) + sink = MyFileBasedSink( + temp_path, + file_name_suffix=StaticValueProvider(value_type=str, value='.output'), + coder=coders.ToStringCoder() + ) + p = TestPipeline() + p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned + p.run() + self.assertEqual( + open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]') + + def test_fixed_shard_write(self): + temp_path = os.path.join(self._new_tempdir(), 'empty') + sink = MyFileBasedSink( + temp_path, + file_name_suffix='.output', + num_shards=3, + shard_name_template='_NN_SSS_', + coder=coders.ToStringCoder()) + p = TestPipeline() + p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned + + p.run() + + concat = ''.join( + open(temp_path + '_03_%03d_.output' % shard_num).read() + for shard_num in range(3)) + self.assertTrue('][a][' in concat, concat) + self.assertTrue('][b][' in concat, concat) + + # Not using 'test' in name so that 'nose' doesn't pick this as a test. + def run_temp_dir_check(self, no_dir_path, dir_path, no_dir_root_path, + dir_root_path, prefix, separator): + def _get_temp_dir(file_path_prefix): + sink = MyFileBasedSink( + file_path_prefix, file_name_suffix='.output', + coder=coders.ToStringCoder()) + return sink.initialize_write() + + temp_dir = _get_temp_dir(no_dir_path) + self.assertTrue(temp_dir.startswith(prefix)) + last_sep = temp_dir.rfind(separator) + self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp')) + + temp_dir = _get_temp_dir(dir_path) + self.assertTrue(temp_dir.startswith(prefix)) + last_sep = temp_dir.rfind(separator) + self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp')) + + with self.assertRaises(ValueError): + _get_temp_dir(no_dir_root_path) + + with self.assertRaises(ValueError): + _get_temp_dir(dir_root_path) + + def test_temp_dir_gcs(self): + try: + self.run_temp_dir_check( + 'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://', + '/') + except ValueError: + logging.debug('Ignoring test since GCP module is not installed') + + @mock.patch('apache_beam.io.localfilesystem.os') + def test_temp_dir_local(self, filesystem_os_mock): + # Here we test a unix-like mock file-system + # (not really testing Unix or Windows since we mock the function of 'os' + # module). + + def _fake_unix_split(path): + sep = path.rfind('/') + if sep < 0: + raise ValueError('Path must contain a separator') + return (path[:sep], path[sep + 1:]) + + def _fake_unix_join(base, path): + return base + '/' + path + + filesystem_os_mock.path.abspath = lambda a: a + filesystem_os_mock.path.split.side_effect = _fake_unix_split + filesystem_os_mock.path.join.side_effect = _fake_unix_join + self.run_temp_dir_check( + '/aaa/bbb', '/aaa/bbb/', '/', '/', '/', '/') + + def test_file_sink_multi_shards(self): + temp_path = os.path.join(self._new_tempdir(), 'multishard') + sink = MyFileBasedSink( + temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) + + # Manually invoke the generic Sink API. + init_token = sink.initialize_write() + + num_shards = 1000 + writer_results = [] + for i in range(num_shards): + uuid = 'uuid-%05d' % i + writer = sink.open_writer(init_token, uuid) + writer.write('a') + writer.write('b') + writer.write(uuid) + writer_results.append(writer.close()) + + res_first = list(sink.finalize_write(init_token, writer_results)) + # Retry the finalize operation (as if the first attempt was lost). + res_second = list(sink.finalize_write(init_token, writer_results)) + + self.assertItemsEqual(res_first, res_second) + + res = sorted(res_second) + for i in range(num_shards): + shard_name = '%s-%05d-of-%05d.output' % (temp_path, i, num_shards) + uuid = 'uuid-%05d' % i + self.assertEqual(res[i], shard_name) + self.assertEqual( + open(shard_name).read(), ('[start][a][b][%s][end]' % uuid)) + + # Check that any temp files are deleted. + self.assertItemsEqual(res, glob.glob(temp_path + '*')) + + def test_file_sink_io_error(self): + temp_path = os.path.join(self._new_tempdir(), 'ioerror') + sink = MyFileBasedSink( + temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) + + # Manually invoke the generic Sink API. + init_token = sink.initialize_write() + + writer1 = sink.open_writer(init_token, '1') + writer1.write('a') + writer1.write('b') + res1 = writer1.close() + + writer2 = sink.open_writer(init_token, '2') + writer2.write('x') + writer2.write('y') + writer2.write('z') + res2 = writer2.close() + + os.remove(res2) + with self.assertRaises(Exception): + list(sink.finalize_write(init_token, [res1, res2])) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py deleted file mode 100644 index aa18093..0000000 --- a/sdks/python/apache_beam/io/fileio.py +++ /dev/null @@ -1,304 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""File-based sources and sinks.""" - -from __future__ import absolute_import - -import logging -import os -import re -import time -import uuid - -from apache_beam.internal import util -from apache_beam.io import iobase -from apache_beam.io.filesystem import BeamIOError -from apache_beam.io.filesystem import CompressionTypes -from apache_beam.io.filesystems import FileSystems -from apache_beam.transforms.display import DisplayDataItem -from apache_beam.options.value_provider import ValueProvider -from apache_beam.options.value_provider import StaticValueProvider -from apache_beam.options.value_provider import check_accessible - -DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN' - -__all__ = ['FileBasedSink'] - - -class FileSink(iobase.Sink): - """A sink to a GCS or local files. - - To implement a file-based sink, extend this class and override - either ``write_record()`` or ``write_encoded_record()``. - - If needed, also overwrite ``open()`` and/or ``close()`` to customize the - file handling or write headers and footers. - - The output of this write is a PCollection of all written shards. - """ - - # Max number of threads to be used for renaming. - _MAX_RENAME_THREADS = 64 - - def __init__(self, - file_path_prefix, - coder, - file_name_suffix='', - num_shards=0, - shard_name_template=None, - mime_type='application/octet-stream', - compression_type=CompressionTypes.AUTO): - """ - Raises: - TypeError: if file path parameters are not a string or ValueProvider, - or if compression_type is not member of CompressionTypes. - ValueError: if shard_name_template is not of expected format. - """ - if not isinstance(file_path_prefix, (basestring, ValueProvider)): - raise TypeError('file_path_prefix must be a string or ValueProvider;' - 'got %r instead' % file_path_prefix) - if not isinstance(file_name_suffix, (basestring, ValueProvider)): - raise TypeError('file_name_suffix must be a string or ValueProvider;' - 'got %r instead' % file_name_suffix) - - if not CompressionTypes.is_valid_compression_type(compression_type): - raise TypeError('compression_type must be CompressionType object but ' - 'was %s' % type(compression_type)) - if shard_name_template is None: - shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE - elif shard_name_template == '': - num_shards = 1 - if isinstance(file_path_prefix, basestring): - file_path_prefix = StaticValueProvider(str, file_path_prefix) - if isinstance(file_name_suffix, basestring): - file_name_suffix = StaticValueProvider(str, file_name_suffix) - self.file_path_prefix = file_path_prefix - self.file_name_suffix = file_name_suffix - self.num_shards = num_shards - self.coder = coder - self.shard_name_format = self._template_to_format(shard_name_template) - self.compression_type = compression_type - self.mime_type = mime_type - - def display_data(self): - return {'shards': - DisplayDataItem(self.num_shards, - label='Number of Shards').drop_if_default(0), - 'compression': - DisplayDataItem(str(self.compression_type)), - 'file_pattern': - DisplayDataItem('{}{}{}'.format(self.file_path_prefix, - self.shard_name_format, - self.file_name_suffix), - label='File Pattern')} - - @check_accessible(['file_path_prefix']) - def open(self, temp_path): - """Opens ``temp_path``, returning an opaque file handle object. - - The returned file handle is passed to ``write_[encoded_]record`` and - ``close``. - """ - return FileSystems.create(temp_path, self.mime_type, self.compression_type) - - def write_record(self, file_handle, value): - """Writes a single record go the file handle returned by ``open()``. - - By default, calls ``write_encoded_record`` after encoding the record with - this sink's Coder. - """ - self.write_encoded_record(file_handle, self.coder.encode(value)) - - def write_encoded_record(self, file_handle, encoded_value): - """Writes a single encoded record to the file handle returned by ``open()``. - """ - raise NotImplementedError - - def close(self, file_handle): - """Finalize and close the file handle returned from ``open()``. - - Called after all records are written. - - By default, calls ``file_handle.close()`` iff it is not None. - """ - if file_handle is not None: - file_handle.close() - - @check_accessible(['file_path_prefix', 'file_name_suffix']) - def initialize_write(self): - file_path_prefix = self.file_path_prefix.get() - - tmp_dir = self._create_temp_dir(file_path_prefix) - FileSystems.mkdirs(tmp_dir) - return tmp_dir - - def _create_temp_dir(self, file_path_prefix): - base_path, last_component = FileSystems.split(file_path_prefix) - if not last_component: - # Trying to re-split the base_path to check if it's a root. - new_base_path, _ = FileSystems.split(base_path) - if base_path == new_base_path: - raise ValueError('Cannot create a temporary directory for root path ' - 'prefix %s. Please specify a file path prefix with ' - 'at least two components.', - file_path_prefix) - path_components = [base_path, - 'beam-temp-' + last_component + '-' + uuid.uuid1().hex] - return FileSystems.join(*path_components) - - @check_accessible(['file_path_prefix', 'file_name_suffix']) - def open_writer(self, init_result, uid): - # A proper suffix is needed for AUTO compression detection. - # We also ensure there will be no collisions with uid and a - # (possibly unsharded) file_path_prefix and a (possibly empty) - # file_name_suffix. - file_path_prefix = self.file_path_prefix.get() - file_name_suffix = self.file_name_suffix.get() - suffix = ( - '.' + os.path.basename(file_path_prefix) + file_name_suffix) - return FileSinkWriter(self, os.path.join(init_result, uid) + suffix) - - @check_accessible(['file_path_prefix', 'file_name_suffix']) - def finalize_write(self, init_result, writer_results): - file_path_prefix = self.file_path_prefix.get() - file_name_suffix = self.file_name_suffix.get() - writer_results = sorted(writer_results) - num_shards = len(writer_results) - min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS) - num_threads = max(1, min_threads) - - source_files = [] - destination_files = [] - chunk_size = FileSystems.get_chunk_size(file_path_prefix) - for shard_num, shard in enumerate(writer_results): - final_name = ''.join([ - file_path_prefix, self.shard_name_format % dict( - shard_num=shard_num, num_shards=num_shards), file_name_suffix - ]) - source_files.append(shard) - destination_files.append(final_name) - - source_file_batch = [source_files[i:i + chunk_size] - for i in xrange(0, len(source_files), - chunk_size)] - destination_file_batch = [destination_files[i:i + chunk_size] - for i in xrange(0, len(destination_files), - chunk_size)] - - logging.info( - 'Starting finalize_write threads with num_shards: %d, ' - 'batches: %d, num_threads: %d', - num_shards, len(source_file_batch), num_threads) - start_time = time.time() - - # Use a thread pool for renaming operations. - def _rename_batch(batch): - """_rename_batch executes batch rename operations.""" - source_files, destination_files = batch - exceptions = [] - try: - FileSystems.rename(source_files, destination_files) - return exceptions - except BeamIOError as exp: - if exp.exception_details is None: - raise - for (src, dest), exception in exp.exception_details.iteritems(): - if exception: - logging.warning('Rename not successful: %s -> %s, %s', src, dest, - exception) - should_report = True - if isinstance(exception, IOError): - # May have already been copied. - try: - if FileSystems.exists(dest): - should_report = False - except Exception as exists_e: # pylint: disable=broad-except - logging.warning('Exception when checking if file %s exists: ' - '%s', dest, exists_e) - if should_report: - logging.warning(('Exception in _rename_batch. src: %s, ' - 'dest: %s, err: %s'), src, dest, exception) - exceptions.append(exception) - else: - logging.debug('Rename successful: %s -> %s', src, dest) - return exceptions - - exception_batches = util.run_using_threadpool( - _rename_batch, zip(source_file_batch, destination_file_batch), - num_threads) - - all_exceptions = [e for exception_batch in exception_batches - for e in exception_batch] - if all_exceptions: - raise Exception('Encountered exceptions in finalize_write: %s', - all_exceptions) - - for final_name in destination_files: - yield final_name - - logging.info('Renamed %d shards in %.2f seconds.', num_shards, - time.time() - start_time) - - try: - FileSystems.delete([init_result]) - except IOError: - # May have already been removed. - pass - - @staticmethod - def _template_to_format(shard_name_template): - if not shard_name_template: - return '' - m = re.search('S+', shard_name_template) - if m is None: - raise ValueError("Shard number pattern S+ not found in template '%s'" % - shard_name_template) - shard_name_format = shard_name_template.replace( - m.group(0), '%%(shard_num)0%dd' % len(m.group(0))) - m = re.search('N+', shard_name_format) - if m: - shard_name_format = shard_name_format.replace( - m.group(0), '%%(num_shards)0%dd' % len(m.group(0))) - return shard_name_format - - def __eq__(self, other): - # TODO: Clean up workitem_test which uses this. - # pylint: disable=unidiomatic-typecheck - return type(self) == type(other) and self.__dict__ == other.__dict__ - - -# Using FileBasedSink for the public API to be symmetric with FileBasedSource. -# TODO: move code from FileSink to here and delete that class. -FileBasedSink = FileSink - - -class FileSinkWriter(iobase.Writer): - """The writer for FileSink. - """ - - def __init__(self, sink, temp_shard_path): - self.sink = sink - self.temp_shard_path = temp_shard_path - self.temp_handle = self.sink.open(temp_shard_path) - - def write(self, value): - self.sink.write_record(self.temp_handle, value) - - def close(self): - self.sink.close(self.temp_handle) - return self.temp_shard_path http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py deleted file mode 100644 index b92b8be..0000000 --- a/sdks/python/apache_beam/io/fileio_test.py +++ /dev/null @@ -1,303 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for file sinks.""" - -import glob -import logging -import os -import shutil -import tempfile -import unittest - -import hamcrest as hc -import mock - -import apache_beam as beam -from apache_beam.coders import coders -from apache_beam.io import fileio -from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.transforms.display import DisplayData -from apache_beam.transforms.display_test import DisplayDataItemMatcher - -from apache_beam.options.value_provider import StaticValueProvider - - -# TODO: Refactor code so all io tests are using same library -# TestCaseWithTempDirCleanup class. -class _TestCaseWithTempDirCleanUp(unittest.TestCase): - """Base class for TestCases that deals with TempDir clean-up. - - Inherited test cases will call self._new_tempdir() to start a temporary dir - which will be deleted at the end of the tests (when tearDown() is called). - """ - - def setUp(self): - self._tempdirs = [] - - def tearDown(self): - for path in self._tempdirs: - if os.path.exists(path): - shutil.rmtree(path) - self._tempdirs = [] - - def _new_tempdir(self): - result = tempfile.mkdtemp() - self._tempdirs.append(result) - return result - - def _create_temp_file(self, name='', suffix=''): - if not name: - name = tempfile.template - file_name = tempfile.NamedTemporaryFile( - delete=False, prefix=name, - dir=self._new_tempdir(), suffix=suffix).name - return file_name - - -class MyFileSink(fileio.FileSink): - - def open(self, temp_path): - # TODO: Fix main session pickling. - # file_handle = super(MyFileSink, self).open(temp_path) - file_handle = fileio.FileSink.open(self, temp_path) - file_handle.write('[start]') - return file_handle - - def write_encoded_record(self, file_handle, encoded_value): - file_handle.write('[') - file_handle.write(encoded_value) - file_handle.write(']') - - def close(self, file_handle): - file_handle.write('[end]') - # TODO: Fix main session pickling. - # file_handle = super(MyFileSink, self).close(file_handle) - file_handle = fileio.FileSink.close(self, file_handle) - - -class TestFileSink(_TestCaseWithTempDirCleanUp): - - def test_file_sink_writing(self): - temp_path = os.path.join(self._new_tempdir(), 'filesink') - sink = MyFileSink( - temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) - - # Manually invoke the generic Sink API. - init_token = sink.initialize_write() - - writer1 = sink.open_writer(init_token, '1') - writer1.write('a') - writer1.write('b') - res1 = writer1.close() - - writer2 = sink.open_writer(init_token, '2') - writer2.write('x') - writer2.write('y') - writer2.write('z') - res2 = writer2.close() - - _ = list(sink.finalize_write(init_token, [res1, res2])) - # Retry the finalize operation (as if the first attempt was lost). - res = list(sink.finalize_write(init_token, [res1, res2])) - - # Check the results. - shard1 = temp_path + '-00000-of-00002.output' - shard2 = temp_path + '-00001-of-00002.output' - self.assertEqual(res, [shard1, shard2]) - self.assertEqual(open(shard1).read(), '[start][a][b][end]') - self.assertEqual(open(shard2).read(), '[start][x][y][z][end]') - - # Check that any temp files are deleted. - self.assertItemsEqual([shard1, shard2], glob.glob(temp_path + '*')) - - def test_file_sink_display_data(self): - temp_path = os.path.join(self._new_tempdir(), 'display') - sink = MyFileSink( - temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) - dd = DisplayData.create_from(sink) - expected_items = [ - DisplayDataItemMatcher( - 'compression', 'auto'), - DisplayDataItemMatcher( - 'file_pattern', - '{}{}'.format( - temp_path, - '-%(shard_num)05d-of-%(num_shards)05d.output'))] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - def test_empty_write(self): - temp_path = tempfile.NamedTemporaryFile().name - sink = MyFileSink( - temp_path, file_name_suffix='.output', coder=coders.ToStringCoder() - ) - p = TestPipeline() - p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - p.run() - self.assertEqual( - open(temp_path + '-00000-of-00001.output').read(), '[start][end]') - - def test_static_value_provider_empty_write(self): - temp_path = StaticValueProvider(value_type=str, - value=tempfile.NamedTemporaryFile().name) - sink = MyFileSink( - temp_path, - file_name_suffix=StaticValueProvider(value_type=str, value='.output'), - coder=coders.ToStringCoder() - ) - p = TestPipeline() - p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - p.run() - self.assertEqual( - open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]') - - def test_fixed_shard_write(self): - temp_path = os.path.join(self._new_tempdir(), 'empty') - sink = MyFileSink( - temp_path, - file_name_suffix='.output', - num_shards=3, - shard_name_template='_NN_SSS_', - coder=coders.ToStringCoder()) - p = TestPipeline() - p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - - p.run() - - concat = ''.join( - open(temp_path + '_03_%03d_.output' % shard_num).read() - for shard_num in range(3)) - self.assertTrue('][a][' in concat, concat) - self.assertTrue('][b][' in concat, concat) - - # Not using 'test' in name so that 'nose' doesn't pick this as a test. - def run_temp_dir_check(self, no_dir_path, dir_path, no_dir_root_path, - dir_root_path, prefix, separator): - def _get_temp_dir(file_path_prefix): - sink = MyFileSink( - file_path_prefix, file_name_suffix='.output', - coder=coders.ToStringCoder()) - return sink.initialize_write() - - temp_dir = _get_temp_dir(no_dir_path) - self.assertTrue(temp_dir.startswith(prefix)) - last_sep = temp_dir.rfind(separator) - self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp')) - - temp_dir = _get_temp_dir(dir_path) - self.assertTrue(temp_dir.startswith(prefix)) - last_sep = temp_dir.rfind(separator) - self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp')) - - with self.assertRaises(ValueError): - _get_temp_dir(no_dir_root_path) - - with self.assertRaises(ValueError): - _get_temp_dir(dir_root_path) - - def test_temp_dir_gcs(self): - try: - self.run_temp_dir_check( - 'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://', - '/') - except ValueError: - logging.debug('Ignoring test since GCP module is not installed') - - @mock.patch('apache_beam.io.localfilesystem.os') - def test_temp_dir_local(self, filesystem_os_mock): - # Here we test a unix-like mock file-system - # (not really testing Unix or Windows since we mock the function of 'os' - # module). - - def _fake_unix_split(path): - sep = path.rfind('/') - if sep < 0: - raise ValueError('Path must contain a separator') - return (path[:sep], path[sep + 1:]) - - def _fake_unix_join(base, path): - return base + '/' + path - - filesystem_os_mock.path.abspath = lambda a: a - filesystem_os_mock.path.split.side_effect = _fake_unix_split - filesystem_os_mock.path.join.side_effect = _fake_unix_join - self.run_temp_dir_check( - '/aaa/bbb', '/aaa/bbb/', '/', '/', '/', '/') - - def test_file_sink_multi_shards(self): - temp_path = os.path.join(self._new_tempdir(), 'multishard') - sink = MyFileSink( - temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) - - # Manually invoke the generic Sink API. - init_token = sink.initialize_write() - - num_shards = 1000 - writer_results = [] - for i in range(num_shards): - uuid = 'uuid-%05d' % i - writer = sink.open_writer(init_token, uuid) - writer.write('a') - writer.write('b') - writer.write(uuid) - writer_results.append(writer.close()) - - res_first = list(sink.finalize_write(init_token, writer_results)) - # Retry the finalize operation (as if the first attempt was lost). - res_second = list(sink.finalize_write(init_token, writer_results)) - - self.assertItemsEqual(res_first, res_second) - - res = sorted(res_second) - for i in range(num_shards): - shard_name = '%s-%05d-of-%05d.output' % (temp_path, i, num_shards) - uuid = 'uuid-%05d' % i - self.assertEqual(res[i], shard_name) - self.assertEqual( - open(shard_name).read(), ('[start][a][b][%s][end]' % uuid)) - - # Check that any temp files are deleted. - self.assertItemsEqual(res, glob.glob(temp_path + '*')) - - def test_file_sink_io_error(self): - temp_path = os.path.join(self._new_tempdir(), 'ioerror') - sink = MyFileSink( - temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) - - # Manually invoke the generic Sink API. - init_token = sink.initialize_write() - - writer1 = sink.open_writer(init_token, '1') - writer1.write('a') - writer1.write('b') - res1 = writer1.close() - - writer2 = sink.open_writer(init_token, '2') - writer2.write('x') - writer2.write('y') - writer2.write('z') - res2 = writer2.close() - - os.remove(res2) - with self.assertRaises(Exception): - list(sink.finalize_write(init_token, [res1, res2])) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/io/gcp/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 774ee54..7e21586 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -257,9 +257,9 @@ class GcsIO(object): self.client.objects.Copy(request) except HttpError as http_error: if http_error.status_code == 404: - # This is a permanent error that should not be retried. Note that - # FileSink.finalize_write expects an IOError when the source file does - # not exist. + # This is a permanent error that should not be retried. Note that + # FileBasedSink.finalize_write expects an IOError when the source + # file does not exist. raise GcsIOError(errno.ENOENT, 'Source file not found: %s' % src) raise http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index a80b12f..7e40d83 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -562,7 +562,9 @@ class RangeTracker(object): class Sink(HasDisplayData): - """A resource that can be written to using the ``beam.io.Write`` transform. + """This class is deprecated, no backwards-compatibility guarantees. + + A resource that can be written to using the ``beam.io.Write`` transform. Here ``beam`` stands for Apache Beam Python code imported in following manner. ``import apache_beam as beam``. @@ -594,8 +596,8 @@ class Sink(HasDisplayData): single record from the bundle and ``close()`` which is called once at the end of writing a bundle. - See also ``apache_beam.io.fileio.FileSink`` which provides a simpler API - for writing sinks that produce files. + See also ``apache_beam.io.filebasedsink.FileBasedSink`` which provides a + simpler API for writing sinks that produce files. **Execution of the Write transform** @@ -759,7 +761,9 @@ class Sink(HasDisplayData): class Writer(object): - """Writes a bundle of elements from a ``PCollection`` to a sink. + """This class is deprecated, no backwards-compatibility guarantees. + + Writes a bundle of elements from a ``PCollection`` to a sink. A Writer ``iobase.Writer.write()`` writes and elements to the sink while ``iobase.Writer.close()`` is called after all elements in the bundle have been http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/io/textio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index d43f4fc..eeefaf6 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -23,7 +23,7 @@ import logging from apache_beam.coders import coders from apache_beam.io import filebasedsource -from apache_beam.io import fileio +from apache_beam.io import filebasedsink from apache_beam.io import iobase from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.iobase import Read @@ -262,7 +262,7 @@ class _TextSource(filebasedsource.FileBasedSource): sep_bounds[1] - record_start_position_in_buffer) -class _TextSink(fileio.FileSink): +class _TextSink(filebasedsink.FileBasedSink): """A sink to a GCS or local text file or files.""" def __init__(self, http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/io/tfrecordio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py index 2f7f4dc..a8cd1ce 100644 --- a/sdks/python/apache_beam/io/tfrecordio.py +++ b/sdks/python/apache_beam/io/tfrecordio.py @@ -23,7 +23,7 @@ import struct from apache_beam import coders from apache_beam.io import filebasedsource -from apache_beam.io import fileio +from apache_beam.io import filebasedsink from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write @@ -210,7 +210,7 @@ class ReadFromTFRecord(PTransform): return pvalue.pipeline | Read(self._source) -class _TFRecordSink(fileio.FileSink): +class _TFRecordSink(filebasedsink.FileBasedSink): """Sink for writing TFRecords files. For detailed TFRecord format description see: @@ -242,7 +242,7 @@ class WriteToTFRecord(PTransform): coder=coders.BytesCoder(), file_name_suffix='', num_shards=0, - shard_name_template=fileio.DEFAULT_SHARD_NAME_TEMPLATE, + shard_name_template=filebasedsink.DEFAULT_SHARD_NAME_TEMPLATE, compression_type=CompressionTypes.AUTO, **kwargs): """Initialize WriteToTFRecord transform. http://git-wip-us.apache.org/repos/asf/beam/blob/2f177820/sdks/python/apache_beam/testing/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers.py b/sdks/python/apache_beam/testing/pipeline_verifiers.py index a08eb54..883343a 100644 --- a/sdks/python/apache_beam/testing/pipeline_verifiers.py +++ b/sdks/python/apache_beam/testing/pipeline_verifiers.py @@ -81,8 +81,8 @@ def retry_on_io_error_and_server_error(exception): class FileChecksumMatcher(BaseMatcher): """Matcher that verifies file(s) content by comparing file checksum. - Use apache_beam.io.fileio to fetch file(s) from given path. File checksum - is a hash string computed from content of file(s). + Use apache_beam.io.filebasedsink to fetch file(s) from given path. + File checksum is a hash string computed from content of file(s). """ def __init__(self, file_path, expected_checksum, sleep_secs=None):