mosche commented on a change in pull request #15931:
URL: https://github.com/apache/beam/pull/15931#discussion_r768427938



##########
File path: sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
##########
@@ -67,6 +69,11 @@ def __init__(self, options):
         aws_secret_access_key=secret_access_key,
         aws_session_token=session_token)
 
+    self._download_request = None

Review comment:
       This adds some local mutable state to the client. I'm not familiar with 
the Python SDK unfortunately, so not sure if this is legitimate. In any case, 
I'd suggest to document this in the pydocs of the client.

##########
File path: sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
##########
@@ -24,16 +24,18 @@
   # pylint: disable=wrong-import-order, wrong-import-position
   # pylint: disable=ungrouped-imports
   import boto3
+  import botocore.exceptions as boto_exception
 
 except ImportError:
   boto3 = None
+  boto_exception = None
 
 
 class Client(object):
   """
   Wrapper for boto3 library
   """
-  def __init__(self, options):
+  def __init__(self, options, download_offset=0):

Review comment:
       @phoerious Could you add some documentation to explain usage of 
`download_offset` as it's currently not used. I'm actually wondering a bit if 
it's necessary at all. If you use `get_range` and skip over some bytes using 
`start` >> `_download_pos`, the stream would be closed and reopened starting 
from `start` without any need for an offset

##########
File path: sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
##########
@@ -92,27 +99,65 @@ def get_object_metadata(self, request):
         boto_response['ContentLength'],
         boto_response['ContentType'])
 
+    item.size = max(item.size - self._download_offset, 0)
     return item
 
+  def get_stream(self, request, start):

Review comment:
       Could you add some pytests for get_stream? Even though fairly trivial, 
it would be good to also carefully test state management here.

##########
File path: sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
##########
@@ -92,27 +99,65 @@ def get_object_metadata(self, request):
         boto_response['ContentLength'],
         boto_response['ContentType'])
 
+    item.size = max(item.size - self._download_offset, 0)
     return item
 
+  def get_stream(self, request, start):
+    """Opens a stream object starting at the given position.
+
+    Args:
+      request: (GetRequest) request
+      start: (int) start offset
+    Returns:
+      (Stream) Boto3 stream object.
+    """
+
+    if self._download_request and (
+        start != self._download_pos or
+        request.bucket != self._download_request.bucket or
+        request.object != self._download_request.object):
+      self._download_stream.close()
+      self._download_stream = None
+
+    # noinspection PyProtectedMember
+    if not self._download_stream or self._download_stream._raw_stream.closed:
+      try:
+        self._download_stream = self.client.get_object(
+            Bucket=request.bucket,
+            Key=request.object,
+            Range='bytes={}-'.format(start + self._download_offset))['Body']
+        self._download_request = request
+        self._download_pos = start
+      except Exception as e:
+        message = e.response['Error'].get(
+            'Message', e.response['Error'].get('Code', ''))
+        code = e.response['ResponseMetadata']['HTTPStatusCode']
+        raise messages.S3ClientError(message, code)
+
+    return self._download_stream
+
   def get_range(self, request, start, end):
     r"""Retrieves an object's contents.
 
       Args:
         request: (GetRequest) request
+        start: (int) start offset
+        end: (int) end offset
       Returns:
         (bytes) The response message.
       """
-    try:
-      boto_response = self.client.get_object(
-          Bucket=request.bucket,
-          Key=request.object,
-          Range='bytes={}-{}'.format(start, end - 1))
-    except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
-
-    return boto_response['Body'].read()  # A bytes object
+    for i in range(self._retries):

Review comment:
       Where is `_retries` initialized? Also, imho, the retry behavior is 
rather important and it would be good to cover it in tests. Also, any retry 
should always use some kind of exponential backoff strategy. In `utils` you can 
find `@retry.with_exponential_backoff`, please have a look.

##########
File path: sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
##########
@@ -92,27 +99,65 @@ def get_object_metadata(self, request):
         boto_response['ContentLength'],
         boto_response['ContentType'])
 
+    item.size = max(item.size - self._download_offset, 0)
     return item
 
+  def get_stream(self, request, start):
+    """Opens a stream object starting at the given position.
+
+    Args:
+      request: (GetRequest) request
+      start: (int) start offset
+    Returns:
+      (Stream) Boto3 stream object.
+    """
+
+    if self._download_request and (
+        start != self._download_pos or
+        request.bucket != self._download_request.bucket or
+        request.object != self._download_request.object):
+      self._download_stream.close()
+      self._download_stream = None
+
+    # noinspection PyProtectedMember
+    if not self._download_stream or self._download_stream._raw_stream.closed:
+      try:
+        self._download_stream = self.client.get_object(
+            Bucket=request.bucket,
+            Key=request.object,
+            Range='bytes={}-'.format(start + self._download_offset))['Body']
+        self._download_request = request
+        self._download_pos = start
+      except Exception as e:
+        message = e.response['Error'].get(
+            'Message', e.response['Error'].get('Code', ''))
+        code = e.response['ResponseMetadata']['HTTPStatusCode']
+        raise messages.S3ClientError(message, code)
+
+    return self._download_stream
+
   def get_range(self, request, start, end):
     r"""Retrieves an object's contents.
 
       Args:
         request: (GetRequest) request
+        start: (int) start offset
+        end: (int) end offset

Review comment:
       Just based on `end - 1` below, for clarity
   ```suggestion
           end: (int) end offset (exclusive)
   ```

##########
File path: sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
##########
@@ -92,27 +99,65 @@ def get_object_metadata(self, request):
         boto_response['ContentLength'],
         boto_response['ContentType'])
 
+    item.size = max(item.size - self._download_offset, 0)
     return item
 
+  def get_stream(self, request, start):
+    """Opens a stream object starting at the given position.
+
+    Args:
+      request: (GetRequest) request
+      start: (int) start offset
+    Returns:
+      (Stream) Boto3 stream object.
+    """
+
+    if self._download_request and (
+        start != self._download_pos or
+        request.bucket != self._download_request.bucket or
+        request.object != self._download_request.object):
+      self._download_stream.close()
+      self._download_stream = None
+
+    # noinspection PyProtectedMember
+    if not self._download_stream or self._download_stream._raw_stream.closed:
+      try:
+        self._download_stream = self.client.get_object(
+            Bucket=request.bucket,
+            Key=request.object,
+            Range='bytes={}-'.format(start + self._download_offset))['Body']
+        self._download_request = request
+        self._download_pos = start
+      except Exception as e:
+        message = e.response['Error'].get(
+            'Message', e.response['Error'].get('Code', ''))
+        code = e.response['ResponseMetadata']['HTTPStatusCode']
+        raise messages.S3ClientError(message, code)
+
+    return self._download_stream
+
   def get_range(self, request, start, end):
     r"""Retrieves an object's contents.
 
       Args:
         request: (GetRequest) request
+        start: (int) start offset
+        end: (int) end offset
       Returns:
         (bytes) The response message.
       """
-    try:
-      boto_response = self.client.get_object(
-          Bucket=request.bucket,
-          Key=request.object,
-          Range='bytes={}-{}'.format(start, end - 1))
-    except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
-
-    return boto_response['Body'].read()  # A bytes object
+    for i in range(self._retries):
+      try:
+        stream = self.get_stream(request, start)
+        data = stream.read(end - start)
+        self._pos += len(data)

Review comment:
       ```suggestion
           self._download_pos += len(data)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to