Fixes GcsIO.exists() to properly handle files that do not exist. Currently this invocation fails for non existing files instead of returning false.
Updates FileSink.finalize_write() so that we capture and log any transient errors that get thrown at the channel_factory.exists() call. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d8a76a53 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d8a76a53 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d8a76a53 Branch: refs/heads/python-sdk Commit: d8a76a53cf60209d13f929ddaec87cd95f6a040a Parents: d72ffb0 Author: Chamikara Jayalath <[email protected]> Authored: Wed Aug 3 18:25:41 2016 -0700 Committer: Chamikara Jayalath <[email protected]> Committed: Mon Aug 8 16:35:25 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 13 ++++++++++--- sdks/python/apache_beam/io/gcsio.py | 9 +++++++-- sdks/python/apache_beam/io/gcsio_test.py | 10 +++++++++- 3 files changed, 26 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d8a76a53/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index b1e091b..ecacb9f 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -530,15 +530,22 @@ class FileSink(iobase.Sink): channel_factory.rename(old_name, final_name) except IOError as e: # May have already been copied. - exists = channel_factory.exists(final_name) + try: + exists = channel_factory.exists(final_name) + except Exception as exists_e: # pylint: disable=broad-except + logging.warning('Exception when invoking channel_factory.exists(): ' + '%s', exists_e) + # Returning original exception after logging the exception from + # exists() call. + return (None, e) if not exists: logging.warning(('IOError in _rename_file. old_name: %s, ' 'final_name: %s, err: %s'), old_name, final_name, e) - return(None, e) + return (None, e) except Exception as e: # pylint: disable=broad-except logging.warning(('Exception in _rename_file. old_name: %s, ' 'final_name: %s, err: %s'), old_name, final_name, e) - return(None, e) + return (None, e) return (final_name, None) # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d8a76a53/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 88fcfb8..7bb532c 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -234,8 +234,13 @@ class GcsIO(object): object=object_path) self.client.objects.Get(request) # metadata return True - except IOError: - return False + except HttpError as http_error: + if http_error.status_code == 404: + # HTTP 404 indicates that the file did not exist + return False + else: + # We re-raise all other exceptions + raise @retry.with_exponential_backoff( retry_filter=retry.retry_on_server_errors_and_timeout_filter) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d8a76a53/sdks/python/apache_beam/io/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index 7b15ef3..b909fdd 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -79,7 +79,8 @@ class FakeGcsObjects(object): def Get(self, get_request, download=None): # pylint: disable=invalid-name f = self.get_file(get_request.bucket, get_request.object) if f is None: - raise ValueError('Specified object does not exist.') + # Failing with a HTTP 404 if file does not exist. + raise HttpError({'status':404}, None, None) if download is None: return f.get_metadata() else: @@ -189,6 +190,13 @@ class TestGCSIO(unittest.TestCase): self.client = FakeGcsClient() self.gcs = gcsio.GcsIO(self.client) + def test_exists(self): + file_name = 'gs://gcsio-test/dummy_file' + file_size = 1234 + self._insert_random_file(self.client, file_name, file_size) + self.assertFalse(self.gcs.exists(file_name + 'xyz')) + self.assertTrue(self.gcs.exists(file_name)) + def test_size(self): file_name = 'gs://gcsio-test/dummy_file' file_size = 1234
