tvalentyn commented on a change in pull request #16229:
URL: https://github.com/apache/beam/pull/16229#discussion_r776457281



##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -715,6 +715,13 @@ def _add_argparse_args(cls, parser):
         help='When true, will enable the direct logging of any detected hot '
         'keys into Cloud Logging. Warning: this will log the literal key as an 
'
         'unobfuscated string.')
+    parser.add_argument(
+        '--enable_artifact_caching',
+        default=False,

Review comment:
       Is caching disabled by default in Java as well?

##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
##########
@@ -1385,6 +1388,246 @@ def test_enable_hot_key_logging(self):
     self.assertEqual(
         env.proto.debugOptions, 
dataflow.DebugOptions(enableHotKeyLogging=True))
 
+  def _mock_uncached_copy(self, staging_root, src, sha256, dst_name=None):
+    sha_prefix = sha256[0:2]
+    gcs_cache_path = FileSystems.join(
+        staging_root,
+        apiclient.DataflowApplicationClient._GCS_CACHE_PREFIX,
+        sha_prefix,
+        sha256)
+
+    if not dst_name:
+      _, dst_name = os.path.split(src)
+    return [
+        mock.call.gcs_exists(gcs_cache_path),
+        mock.call.gcs_upload(src, gcs_cache_path),
+        mock.call.gcs_gcs_copy(
+            gcs_cache_path, f'gs://test-location/staging/{dst_name}')
+    ]
+
+  def _mock_cached_copy(self, staging_root, src, sha256, dst_name=None):
+    uncached = self._mock_uncached_copy(staging_root, src, sha256, dst_name)
+    uncached.pop(1)

Review comment:
       I am having trouble following the logic in this unit test.
   As an example, could you please explain what happens in this helper?
   Is the intent for helpers `_mock_uncached_copy`, `_mock_cached_copy` to 
return expected calls for an upload that had a cache hit and an upload that had 
a cache miss? There may be name conflation with helpers in the code 
`_uncached_gcs_file_copy` (upload without caching enabled) 
`_cached_gcs_file_copy` (upload without  caching enabled) 

##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -519,11 +523,16 @@ def __reduce__(self):
 
 
 class DataflowApplicationClient(object):
+  _HASH_CHUNK_SIZE = 1024 * 8
+  _GCS_CACHE_PREFIX = "artifact_cache"
   """A Dataflow API client used by application code to create and query 
jobs."""
-  def __init__(self, options):
+  def __init__(self, options, root_staging_location=None):

Review comment:
       Why do we need an additional param? I am not seeing usage of custom 
`root_staging_location` in the PR and this is an internal module.

##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -556,9 +565,79 @@ def _get_sdk_image_overrides(self, pipeline_options):
     return (
         dict(s.split(',', 1) for s in sdk_overrides) if sdk_overrides else {})
 
+  @staticmethod
+  def _compute_sha256(file):
+    hasher = hashlib.sha256()
+    with open(file, 'rb') as f:
+      for chunk in iter(partial(f.read,
+                                DataflowApplicationClient._HASH_CHUNK_SIZE),
+                        b""):
+        hasher.update(chunk)
+    return hasher.hexdigest()
+
+  @staticmethod
+  def _split_gcs_path(path):
+    if not path.startswith('gs://'):
+      raise RuntimeError('Expected gs:// path, got %s', path)
+    return path[5:].split('/', 1)
+
+  def _cached_location(self, sha256):
+    sha_prefix = sha256[0:2]
+    return FileSystems.join(
+        self._root_staging_location,
+        DataflowApplicationClient._GCS_CACHE_PREFIX,
+        sha_prefix,
+        sha256)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _gcs_object_exists(self, gcs_or_local_path):
+    if not gcs_or_local_path.startswith('gs://'):
+      return False
+    else:
+      bucket, name = self._split_gcs_path(gcs_or_local_path)
+      request = storage.StorageObjectsGetRequest(bucket=bucket, object=name)
+      try:
+        self._storage_client.objects.Get(request)
+        return True
+      except exceptions.HttpError as e:
+        return e.status_code not in (403, 404)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _gcs_to_gcs_copy(self, from_gcs, to_gcs):

Review comment:
       Have you tried using `FileSystems.copy` for this purpose? Ideally, we 
should be leveraging the FileSystems module and not call GCS client directly in 
this and other helpers if possible. This would reduce the necessary work to 
update to non-deprecated GCS client in the future.

##########
File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
##########
@@ -556,9 +565,79 @@ def _get_sdk_image_overrides(self, pipeline_options):
     return (
         dict(s.split(',', 1) for s in sdk_overrides) if sdk_overrides else {})
 
+  @staticmethod
+  def _compute_sha256(file):
+    hasher = hashlib.sha256()
+    with open(file, 'rb') as f:
+      for chunk in iter(partial(f.read,
+                                DataflowApplicationClient._HASH_CHUNK_SIZE),
+                        b""):
+        hasher.update(chunk)
+    return hasher.hexdigest()
+
+  @staticmethod
+  def _split_gcs_path(path):
+    if not path.startswith('gs://'):
+      raise RuntimeError('Expected gs:// path, got %s', path)
+    return path[5:].split('/', 1)
+
+  def _cached_location(self, sha256):
+    sha_prefix = sha256[0:2]
+    return FileSystems.join(
+        self._root_staging_location,
+        DataflowApplicationClient._GCS_CACHE_PREFIX,
+        sha_prefix,
+        sha256)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def _gcs_object_exists(self, gcs_or_local_path):
+    if not gcs_or_local_path.startswith('gs://'):
+      return False
+    else:
+      bucket, name = self._split_gcs_path(gcs_or_local_path)
+      request = storage.StorageObjectsGetRequest(bucket=bucket, object=name)
+      try:
+        self._storage_client.objects.Get(request)
+        return True
+      except exceptions.HttpError as e:
+        return e.status_code not in (403, 404)

Review comment:
       should return false upon 403, 404 and re-raise otherwise to trigger the 
retry logic?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to