Abacn commented on code in PR #28079:
URL: https://github.com/apache/beam/pull/28079#discussion_r1334870065
##########
sdks/python/apache_beam/io/gcp/bigquery_tools_test.py:
##########
@@ -224,23 +223,6 @@ def test_delete_dataset_retries_for_timeouts(self,
patched_time_sleep):
wrapper._delete_dataset('', '')
self.assertTrue(client.datasets.Delete.called)
- @unittest.skipIf(
Review Comment:
this test gets removed while the reason while it fails internally remains
unknown. From a high level there might be still some regression regarding
credential / anonymous credentials - to aware
I am fine with it for now but please do not remove the test until we
understand why it fails in certain environment, could skip if for now
##########
sdks/python/apache_beam/options/pipeline_options_validator_test.py:
##########
@@ -106,6 +106,7 @@ def test_missing_required_options(self):
]), ('gs://foo', 'gs://foo/bar', []),
('gs://foo/', 'gs://foo/bar', []), ('gs://foo/bar', 'gs://foo/bar', [])
])
+ @unittest.skip('Not compatible with new GCS client. See GH issue #26335.')
Review Comment:
While it was understandable some bigquery unit test (related to file_load)
was not compatible with new gcs client (they need adjust mock). How could
pipeline option validator test be affected by the change?
##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -271,186 +190,119 @@ def delete(self, path):
Args:
path: GCS file path pattern in the form gs://<bucket>/<name>.
"""
- bucket, object_path = parse_gcs_path(path)
- request = storage.StorageObjectsDeleteRequest(
- bucket=bucket, object=object_path)
+ bucket_name, blob_name = parse_gcs_path(path)
try:
- self.client.objects.Delete(request)
- except HttpError as http_error:
- if http_error.status_code == 404:
- # Return success when the file doesn't exist anymore for idempotency.
- return
- raise
-
- # We intentionally do not decorate this method with a retry, as retrying is
- # handled in BatchApiRequest.Execute().
+ bucket = self.client.get_bucket(bucket_name)
+ bucket.delete_blob(blob_name)
+ except NotFound:
+ return
+
def delete_batch(self, paths):
"""Deletes the objects at the given GCS paths.
Args:
paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
not to exceed MAX_BATCH_OPERATION_SIZE in length.
- Returns: List of tuples of (path, exception) in the same order as the paths
- argument, where exception is None if the operation succeeded or
- the relevant exception if the operation failed.
+ Returns: List of tuples of (path, exception) in the same order as the
+ paths argument, where exception is None if the operation
+ succeeded or the relevant exception if the operation failed.
"""
- if not paths:
- return []
-
- paths = iter(paths)
- result_statuses = []
- while True:
- paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
- if not paths_chunk:
- return result_statuses
- batch_request = BatchApiRequest(
- batch_url=GCS_BATCH_ENDPOINT,
- retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
- response_encoding='utf-8')
- for path in paths_chunk:
- bucket, object_path = parse_gcs_path(path)
- request = storage.StorageObjectsDeleteRequest(
- bucket=bucket, object=object_path)
- batch_request.Add(self.client.objects, 'Delete', request)
- api_calls = batch_request.Execute(self.client._http) # pylint:
disable=protected-access
- for i, api_call in enumerate(api_calls):
- path = paths_chunk[i]
- exception = None
- if api_call.is_error:
- exception = api_call.exception
- # Return success when the file doesn't exist anymore for idempotency.
- if isinstance(exception, HttpError) and exception.status_code == 404:
- exception = None
- result_statuses.append((path, exception))
- return result_statuses
+ final_results = []
+ s = 0
+ while s < len(paths):
+ if (s + MAX_BATCH_OPERATION_SIZE) < len(paths):
+ current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
+ else:
+ current_paths = paths[s:]
+ current_batch = self.client.batch(raise_exception=False)
+ with current_batch:
+ for path in current_paths:
+ bucket_name, blob_name = parse_gcs_path(path)
+ bucket = self.client.get_bucket(bucket_name)
+ bucket.delete_blob(blob_name)
+
+ for i, path in enumerate(current_paths):
+ error_code = None
+ for j in range(2):
+ resp = current_batch._responses[2 * i + j]
+ if resp.status_code >= 400 and resp.status_code != 404:
+ error_code = resp.status_code
+ break
+ final_results.append((path, error_code))
+
+ s += MAX_BATCH_OPERATION_SIZE
+
+ return final_results
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
- def copy(
- self,
- src,
- dest,
- dest_kms_key_name=None,
- max_bytes_rewritten_per_call=None):
+ def copy(self, src, dest):
"""Copies the given GCS object from src to dest.
Args:
src: GCS file path pattern in the form gs://<bucket>/<name>.
dest: GCS file path pattern in the form gs://<bucket>/<name>.
- dest_kms_key_name: Experimental. No backwards compatibility guarantees.
- Encrypt dest with this Cloud KMS key. If None, will use dest bucket
- encryption defaults.
- max_bytes_rewritten_per_call: Experimental. No backwards compatibility
- guarantees. Each rewrite API call will return after these many bytes.
- Used for testing.
Raises:
TimeoutError: on timeout.
"""
- src_bucket, src_path = parse_gcs_path(src)
- dest_bucket, dest_path = parse_gcs_path(dest)
- request = storage.StorageObjectsRewriteRequest(
- sourceBucket=src_bucket,
- sourceObject=src_path,
- destinationBucket=dest_bucket,
- destinationObject=dest_path,
- destinationKmsKeyName=dest_kms_key_name,
- maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
- response = self.client.objects.Rewrite(request)
- while not response.done:
- _LOGGER.debug(
- 'Rewrite progress: %d of %d bytes, %s to %s',
- response.totalBytesRewritten,
- response.objectSize,
- src,
- dest)
- request.rewriteToken = response.rewriteToken
- response = self.client.objects.Rewrite(request)
- if self._rewrite_cb is not None:
- self._rewrite_cb(response)
-
- _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
- # We intentionally do not decorate this method with a retry, as retrying is
- # handled in BatchApiRequest.Execute().
- def copy_batch(
- self,
- src_dest_pairs,
- dest_kms_key_name=None,
- max_bytes_rewritten_per_call=None):
- """Copies the given GCS object from src to dest.
+ src_bucket_name, src_blob_name = parse_gcs_path(src)
+ dest_bucket_name, dest_blob_name= parse_gcs_path(dest,
object_optional=True)
+ src_bucket = self.get_bucket(src_bucket_name)
+ src_blob = src_bucket.get_blob(src_blob_name)
+ if not src_blob:
+ raise NotFound("Source %s not found", src)
+ dest_bucket = self.get_bucket(dest_bucket_name)
+ if not dest_blob_name:
+ dest_blob_name = None
+ src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_blob_name)
+
+ def copy_batch(self, src_dest_pairs):
+ """Copies the given GCS objects from src to dest.
Args:
src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
paths to copy from src to dest, not to exceed
MAX_BATCH_OPERATION_SIZE in length.
- dest_kms_key_name: Experimental. No backwards compatibility guarantees.
- Encrypt dest with this Cloud KMS key. If None, will use dest bucket
- encryption defaults.
- max_bytes_rewritten_per_call: Experimental. No backwards compatibility
- guarantees. Each rewrite call will return after these many bytes. Used
- primarily for testing.
Returns: List of tuples of (src, dest, exception) in the same order as the
src_dest_pairs argument, where exception is None if the operation
succeeded or the relevant exception if the operation failed.
"""
- if not src_dest_pairs:
- return []
- pair_to_request = {}
- for pair in src_dest_pairs:
- src_bucket, src_path = parse_gcs_path(pair[0])
- dest_bucket, dest_path = parse_gcs_path(pair[1])
- request = storage.StorageObjectsRewriteRequest(
- sourceBucket=src_bucket,
- sourceObject=src_path,
- destinationBucket=dest_bucket,
- destinationObject=dest_path,
- destinationKmsKeyName=dest_kms_key_name,
- maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
- pair_to_request[pair] = request
- pair_to_status = {}
- while True:
- pairs_in_batch = list(set(src_dest_pairs) - set(pair_to_status))
- if not pairs_in_batch:
- break
- batch_request = BatchApiRequest(
- batch_url=GCS_BATCH_ENDPOINT,
- retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
- response_encoding='utf-8')
- for pair in pairs_in_batch:
- batch_request.Add(self.client.objects, 'Rewrite',
pair_to_request[pair])
- api_calls = batch_request.Execute(self.client._http) # pylint:
disable=protected-access
- for pair, api_call in zip(pairs_in_batch, api_calls):
- src, dest = pair
- response = api_call.response
- if self._rewrite_cb is not None:
- self._rewrite_cb(response)
- if api_call.is_error:
- exception = api_call.exception
- # Translate 404 to the appropriate not found exception.
- if isinstance(exception, HttpError) and exception.status_code == 404:
- exception = (
- GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
- pair_to_status[pair] = exception
- elif not response.done:
- _LOGGER.debug(
- 'Rewrite progress: %d of %d bytes, %s to %s',
- response.totalBytesRewritten,
- response.objectSize,
- src,
- dest)
- pair_to_request[pair].rewriteToken = response.rewriteToken
- else:
- _LOGGER.debug('Rewrite done: %s to %s', src, dest)
- pair_to_status[pair] = None
-
- return [(pair[0], pair[1], pair_to_status[pair]) for pair in
src_dest_pairs]
+ final_results = []
+ s = 0
+ while s < len(src_dest_pairs):
+ if (s + MAX_BATCH_OPERATION_SIZE) < len(src_dest_pairs):
+ current_pairs = src_dest_pairs[s:s + MAX_BATCH_OPERATION_SIZE]
+ else:
+ current_pairs = src_dest_pairs[s:]
+ current_batch = self.client.batch(raise_exception=False)
+ with current_batch:
+ for pair in current_pairs:
+ src_bucket_name, src_blob_name = parse_gcs_path(pair[0])
+ dest_bucket_name, dest_blob_name = parse_gcs_path(pair[1])
+ src_bucket = self.client.get_bucket(src_bucket_name)
+ src_blob = src_bucket.get_blob(src_blob_name)
+ dest_bucket = self.client.get_bucket(dest_bucket_name)
+
+ src_bucket.copy_blob(src_blob, dest_bucket, dest_blob_name)
+
+ for i, pair in enumerate(current_pairs):
+ error_code = None
+ for j in range(4):
+ resp = current_batch._responses[4 * i + j]
+ if resp.status_code >= 400:
+ error_code = resp.status_code
+ break
+ final_results.append((pair[0], pair[1], error_code))
+
+ s += MAX_BATCH_OPERATION_SIZE
+
+ return final_results
Review Comment:
With the current code logic the "Returns:" spec is no longer true. Now it
only returns a list of failed items. For compatibility reason I suggest we
still return succeeded also item and keep "List of tuples of (src, dest,
exception) in the same order as the src_dest_pairs argument" the spec.
Same as delete_batch
##########
sdks/python/setup.py:
##########
@@ -308,30 +308,31 @@ def get_portability_package_data():
'hypothesis>5.0.0,<=7.0.0',
],
'gcp': [
- 'cachetools>=3.1.0,<6',
- 'google-api-core>=2.0.0,<3',
- 'google-apitools>=0.5.31,<0.5.32',
- # NOTE: Maintainers, please do not require google-auth>=2.x.x
- # Until this issue is closed
- # https://github.com/googleapis/google-cloud-python/issues/10566
- 'google-auth>=1.18.0,<3',
- 'google-auth-httplib2>=0.1.0,<0.2.0',
- 'google-cloud-datastore>=2.0.0,<3',
- 'google-cloud-pubsub>=2.1.0,<3',
- 'google-cloud-pubsublite>=1.2.0,<2',
- # GCP packages required by tests
- 'google-cloud-bigquery>=2.0.0,<4',
- 'google-cloud-bigquery-storage>=2.6.3,<3',
- 'google-cloud-core>=2.0.0,<3',
- 'google-cloud-bigtable>=2.19.0,<3',
- 'google-cloud-spanner>=3.0.0,<4',
- # GCP Packages required by ML functionality
- 'google-cloud-dlp>=3.0.0,<4',
- 'google-cloud-language>=2.0,<3',
- 'google-cloud-videointelligence>=2.0,<3',
- 'google-cloud-vision>=2,<4',
- 'google-cloud-recommendations-ai>=0.1.0,<0.11.0',
- 'google-cloud-aiplatform>=1.26.0, < 2.0'
+ 'cachetools>=3.1.0,<6',
Review Comment:
please fix diff (unnecessary indent change)
##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -492,12 +491,19 @@ def __init__(self, options, root_staging_location=None):
self._root_staging_location = (
root_staging_location or self.google_cloud_options.staging_location)
- self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
+ from apache_beam.runners.dataflow.dataflow_runner import
_is_runner_v2_disabled
+ from google.cloud import storage
+ if _is_runner_v2_disabled(options):
Review Comment:
Seems irrelevant change. Rebase error? Specifically it undo [this
change](https://github.com/apache/beam/commit/ca674aad086e4d1a40e8c01598073030a22484fa#diff-e616be01292a173f978dab1922575bbe06c2554b3bc6073de4aa1117b81f5a13)
on master
##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -654,38 +658,45 @@ def stage_file(
mime_type='application/octet-stream',
total_size=None):
"""Stages a file at a GCS or local path with stream-supplied contents."""
+ from google.cloud.exceptions import Forbidden
+ from google.cloud.exceptions import NotFound
if not gcs_or_local_path.startswith('gs://'):
local_path = FileSystems.join(gcs_or_local_path, file_name)
_LOGGER.info('Staging file locally to %s', local_path)
with open(local_path, 'wb') as f:
f.write(stream.read())
return
gcs_location = FileSystems.join(gcs_or_local_path, file_name)
- bucket, name = gcs_location[5:].split('/', 1)
-
- request = storage.StorageObjectsInsertRequest(bucket=bucket, name=name)
+ bucket_name, blob_name = gcs_location[5:].split('/', 1)
start_time = time.time()
_LOGGER.info('Starting GCS upload to %s...', gcs_location)
- upload = storage.Upload(stream, mime_type, total_size)
try:
- response = self._storage_client.objects.Insert(request, upload=upload)
- except exceptions.HttpError as e:
- reportable_errors = {
- 403: 'access denied',
- 404: 'bucket not found',
- }
- if e.status_code in reportable_errors:
+ from google.cloud.storage import Blob
+ from google.cloud.storage.fileio import BlobWriter
+ bucket = self._storage_client.get_bucket(bucket_name)
+ blob = bucket.get_blob(blob_name)
+ if not blob:
+ blob = Blob(blob_name, bucket)
+ with BlobWriter(blob) as f:
+ f.write(stream.read())
+ return
+ except Exception as e:
+ reportable_errors = [
+ Forbidden,
+ NotFound,
+ ]
+ if type(e) in reportable_errors:
raise IOError((
'Could not upload to GCS path %s: %s. Please verify '
- 'that credentials are valid and that you have write '
- 'access to the specified path.') %
- (gcs_or_local_path, reportable_errors[e.status_code]))
+ 'that credentials are valid, that the specified path '
+ 'exists, and that you have write access to it.') %
+ (gcs_or_local_path, e))
raise
- _LOGGER.info(
- 'Completed GCS upload to %s in %s seconds.',
- gcs_location,
- int(time.time() - start_time))
- return response
+ finally:
Review Comment:
"finally" will execute when there are errors, at this point the message
"Completed GCS upload" is not accurate
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]