phoerious commented on a change in pull request #15931:
URL: https://github.com/apache/beam/pull/15931#discussion_r768491540



##########
File path: sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
##########
@@ -92,27 +99,65 @@ def get_object_metadata(self, request):
         boto_response['ContentLength'],
         boto_response['ContentType'])
 
+    item.size = max(item.size - self._download_offset, 0)
     return item
 
+  def get_stream(self, request, start):
+    """Opens a stream object starting at the given position.
+
+    Args:
+      request: (GetRequest) request
+      start: (int) start offset
+    Returns:
+      (Stream) Boto3 stream object.
+    """
+
+    if self._download_request and (
+        start != self._download_pos or
+        request.bucket != self._download_request.bucket or
+        request.object != self._download_request.object):
+      self._download_stream.close()
+      self._download_stream = None
+
+    # noinspection PyProtectedMember
+    if not self._download_stream or self._download_stream._raw_stream.closed:
+      try:
+        self._download_stream = self.client.get_object(
+            Bucket=request.bucket,
+            Key=request.object,
+            Range='bytes={}-'.format(start + self._download_offset))['Body']
+        self._download_request = request
+        self._download_pos = start
+      except Exception as e:
+        message = e.response['Error'].get(
+            'Message', e.response['Error'].get('Code', ''))
+        code = e.response['ResponseMetadata']['HTTPStatusCode']
+        raise messages.S3ClientError(message, code)
+
+    return self._download_stream
+
   def get_range(self, request, start, end):
     r"""Retrieves an object's contents.
 
       Args:
         request: (GetRequest) request
+        start: (int) start offset
+        end: (int) end offset
       Returns:
         (bytes) The response message.
       """
-    try:
-      boto_response = self.client.get_object(
-          Bucket=request.bucket,
-          Key=request.object,
-          Range='bytes={}-{}'.format(start, end - 1))
-    except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
-
-    return boto_response['Body'].read()  # A bytes object
+    for i in range(self._retries):

Review comment:
       Good point. Done.
   
   The main reason for the retry here is that long-lived connections tend to 
close intermittently if the reads do not occur at a fast and constant rate (I 
regularly have issues with that when reading and processing records from a WARC 
file), so I want to retry at least once. The first retry should occur pretty 
much instantly, so I opted to give `with_exponential_backoff` a very small 
initial delay.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to