This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3755a15  Improving performance of file deletion in ReadFromBQ
     new 7cd61b5  Merge pull request #13125 from [BEAM-11069] Improving 
performance of file deletion in ReadFromBQ
3755a15 is described below

commit 3755a153ddce88a6427e7529d9d0bc0ebaf2e0d8
Author: Pablo Estrada <[email protected]>
AuthorDate: Wed Oct 14 22:09:07 2020 -0700

    Improving performance of file deletion in ReadFromBQ
---
 sdks/python/apache_beam/io/gcp/bigquery.py | 14 +++++-----
 sdks/python/apache_beam/io/gcp/gcsio.py    | 45 +++++++++++++++++-------------
 2 files changed, 33 insertions(+), 26 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9ef80cd..dacb963 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1938,6 +1938,7 @@ class ReadFromBigQuery(PTransform):
       gcs_location_vp,  # type: Optional[ValueProvider]
       temp_location,  # type: Optional[str]
       unique_id,  # type: str
+      directory_only=False,  # type: bool
   ):
     """Returns the fully qualified Google Cloud Storage URI where the
     extracted table should be written.
@@ -1959,7 +1960,10 @@ class ReadFromBigQuery(PTransform):
           'gcs_location in the constructor nor the fallback option '
           '--temp_location is set.')
 
-    return FileSystems.join(gcs_base, unique_id, file_pattern)
+    if directory_only:
+      return FileSystems.join(gcs_base, unique_id)
+    else:
+      return FileSystems.join(gcs_base, unique_id, file_pattern)
 
   def expand(self, pcoll):
     class RemoveExportedFiles(beam.DoFn):
@@ -1970,12 +1974,8 @@ class ReadFromBigQuery(PTransform):
 
       def process(self, unused_element, signal):
         gcs_location = ReadFromBigQuery.get_destination_uri(
-            self._gcs_location_vp, self._temp_location, self._unique_id)
-        match_result = FileSystems.match([gcs_location])[0].metadata_list
-        _LOGGER.debug(
-            "%s: matched %s files", self.__class__.__name__, len(match_result))
-        paths = [x.path for x in match_result]
-        FileSystems.delete(paths)
+            self._gcs_location_vp, self._temp_location, self._unique_id, True)
+        FileSystems.delete([gcs_location + '/'])
 
     unique_id = str(uuid.uuid4())[0:10]
     temp_location = pcoll.pipeline.options.view_as(
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py 
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 2e5b059..a83364b 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -35,6 +35,7 @@ import threading
 import time
 import traceback
 from builtins import object
+from itertools import islice
 
 from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.filesystemio import Downloader
@@ -257,26 +258,32 @@ class GcsIO(object):
     """
     if not paths:
       return []
-    batch_request = BatchApiRequest(
-        batch_url=GCS_BATCH_ENDPOINT,
-        retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-        response_encoding='utf-8')
-    for path in paths:
-      bucket, object_path = parse_gcs_path(path)
-      request = storage.StorageObjectsDeleteRequest(
-          bucket=bucket, object=object_path)
-      batch_request.Add(self.client.objects, 'Delete', request)
-    api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
+
+    paths = iter(paths)
     result_statuses = []
-    for i, api_call in enumerate(api_calls):
-      path = paths[i]
-      exception = None
-      if api_call.is_error:
-        exception = api_call.exception
-        # Return success when the file doesn't exist anymore for idempotency.
-        if isinstance(exception, HttpError) and exception.status_code == 404:
-          exception = None
-      result_statuses.append((path, exception))
+    while True:
+      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
+      if not paths_chunk:
+        return result_statuses
+      batch_request = BatchApiRequest(
+          batch_url=GCS_BATCH_ENDPOINT,
+          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
+          response_encoding='utf-8')
+      for path in paths_chunk:
+        bucket, object_path = parse_gcs_path(path)
+        request = storage.StorageObjectsDeleteRequest(
+            bucket=bucket, object=object_path)
+        batch_request.Add(self.client.objects, 'Delete', request)
+      api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
+      for i, api_call in enumerate(api_calls):
+        path = paths_chunk[i]
+        exception = None
+        if api_call.is_error:
+          exception = api_call.exception
+          # Return success when the file doesn't exist anymore for idempotency.
+          if isinstance(exception, HttpError) and exception.status_code == 404:
+            exception = None
+        result_statuses.append((path, exception))
     return result_statuses
 
   @retry.with_exponential_backoff(

Reply via email to