This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3755a15 Improving performance of file deletion in ReadFromBQ
new 7cd61b5 Merge pull request #13125 from [BEAM-11069] Improving
performance of file deletion in ReadFromBQ
3755a15 is described below
commit 3755a153ddce88a6427e7529d9d0bc0ebaf2e0d8
Author: Pablo Estrada <[email protected]>
AuthorDate: Wed Oct 14 22:09:07 2020 -0700
Improving performance of file deletion in ReadFromBQ
---
sdks/python/apache_beam/io/gcp/bigquery.py | 14 +++++-----
sdks/python/apache_beam/io/gcp/gcsio.py | 45 +++++++++++++++++-------------
2 files changed, 33 insertions(+), 26 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9ef80cd..dacb963 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1938,6 +1938,7 @@ class ReadFromBigQuery(PTransform):
gcs_location_vp, # type: Optional[ValueProvider]
temp_location, # type: Optional[str]
unique_id, # type: str
+ directory_only=False, # type: bool
):
"""Returns the fully qualified Google Cloud Storage URI where the
extracted table should be written.
@@ -1959,7 +1960,10 @@ class ReadFromBigQuery(PTransform):
'gcs_location in the constructor nor the fallback option '
'--temp_location is set.')
- return FileSystems.join(gcs_base, unique_id, file_pattern)
+ if directory_only:
+ return FileSystems.join(gcs_base, unique_id)
+ else:
+ return FileSystems.join(gcs_base, unique_id, file_pattern)
def expand(self, pcoll):
class RemoveExportedFiles(beam.DoFn):
@@ -1970,12 +1974,8 @@ class ReadFromBigQuery(PTransform):
def process(self, unused_element, signal):
gcs_location = ReadFromBigQuery.get_destination_uri(
- self._gcs_location_vp, self._temp_location, self._unique_id)
- match_result = FileSystems.match([gcs_location])[0].metadata_list
- _LOGGER.debug(
- "%s: matched %s files", self.__class__.__name__, len(match_result))
- paths = [x.path for x in match_result]
- FileSystems.delete(paths)
+ self._gcs_location_vp, self._temp_location, self._unique_id, True)
+ FileSystems.delete([gcs_location + '/'])
unique_id = str(uuid.uuid4())[0:10]
temp_location = pcoll.pipeline.options.view_as(
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 2e5b059..a83364b 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -35,6 +35,7 @@ import threading
import time
import traceback
from builtins import object
+from itertools import islice
from apache_beam.internal.http_client import get_new_http
from apache_beam.io.filesystemio import Downloader
@@ -257,26 +258,32 @@ class GcsIO(object):
"""
if not paths:
return []
- batch_request = BatchApiRequest(
- batch_url=GCS_BATCH_ENDPOINT,
- retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
- response_encoding='utf-8')
- for path in paths:
- bucket, object_path = parse_gcs_path(path)
- request = storage.StorageObjectsDeleteRequest(
- bucket=bucket, object=object_path)
- batch_request.Add(self.client.objects, 'Delete', request)
- api_calls = batch_request.Execute(self.client._http) # pylint:
disable=protected-access
+
+ paths = iter(paths)
result_statuses = []
- for i, api_call in enumerate(api_calls):
- path = paths[i]
- exception = None
- if api_call.is_error:
- exception = api_call.exception
- # Return success when the file doesn't exist anymore for idempotency.
- if isinstance(exception, HttpError) and exception.status_code == 404:
- exception = None
- result_statuses.append((path, exception))
+ while True:
+ paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
+ if not paths_chunk:
+ return result_statuses
+ batch_request = BatchApiRequest(
+ batch_url=GCS_BATCH_ENDPOINT,
+ retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
+ response_encoding='utf-8')
+ for path in paths_chunk:
+ bucket, object_path = parse_gcs_path(path)
+ request = storage.StorageObjectsDeleteRequest(
+ bucket=bucket, object=object_path)
+ batch_request.Add(self.client.objects, 'Delete', request)
+ api_calls = batch_request.Execute(self.client._http) # pylint:
disable=protected-access
+ for i, api_call in enumerate(api_calls):
+ path = paths_chunk[i]
+ exception = None
+ if api_call.is_error:
+ exception = api_call.exception
+ # Return success when the file doesn't exist anymore for idempotency.
+ if isinstance(exception, HttpError) and exception.status_code == 404:
+ exception = None
+ result_statuses.append((path, exception))
return result_statuses
@retry.with_exponential_backoff(