sunank200 commented on issue #16627:
URL: https://github.com/apache/airflow/issues/16627#issuecomment-873503661


   @eladkal , @alexInhert @potiuk I would love to add this feature and take 
this as my first issue on the airflow. Can I take this up?
   
   I can think of the following approach that to implement this feature. Here, 
the class 
[S3Hook](https://github.com/apache/airflow/blob/c8a628abf484f0bd9805f44dd37e284d2b5ee7db/airflow/providers/amazon/aws/hooks/s3.py#L96)
  Interact with AWS S3, using the boto3 library. The hook has 
[list_keys](https://github.com/apache/airflow/blob/c8a628abf484f0bd9805f44dd37e284d2b5ee7db/airflow/providers/amazon/aws/hooks/s3.py#L265),
 which uses 
[S3.Client.list_objects_v2](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.list_objects_v2)
 of boto3 to fetch the list of keys. The list_object_v2 documentation doesn't 
specify the argument to filter keys by creation date of file or last modified 
date, but the response contains last modified date as per documentation. 
   The current implementation of list_keys in the S3Hook uses paginate method 
of a 
[Paginator](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html)
 in order to iterate over the pages of API operation results. Hence, the 
approach I purpose here is that the keys can be filtered for last modified date 
using JMESPath. JMESPath is a query language for JSON that can be used directly 
on paginated results. One can filter results using JMESPath expressions that 
are applied to each page of results through the search method of a PageIterator 
of S3 Paginator. I have added the code snippet of the JMESPath expression below 
which would list the keys based on filter of last modified datetime between 
`from_datetime` and `to_datetime` which defaults to None.
   
   ```
       paginator = self.get_conn().get_paginator('list_objects_v2')
       response = paginator.paginate(
               Bucket=bucket_name, Prefix=prefix, Delimiter=delimiter, 
PaginationConfig=config
           )
   
       # JMESPath to query directly on paginated results
       filtered_response = response.search(
               "Contents[?to_string("
               "LastModified)<='\"{}\"' && "
               "to_string(LastModified)>='\"{"
               "}\"'].Key".format(to_datetime, from_datetime)
           )
       keys = []
       for key in filtered_response:
           keys.append(key)
   ```
   
   This change wouldn't affect dependencies for other operators like 
`S3DeleteObjectsOperator`, `S3ListOperator`, S3Hook methods:`get_wildcard_key`, 
`delete_bucket` and `S3KeysUnchangedSensor`.
   
   Corresponding unittest can be modified and added to 
[test_s3.py](https://github.com/apache/airflow/blob/5399f9124a4e75c7bb89e47c267d89b5280060ad/tests/providers/amazon/aws/hooks/test_s3.py#L146)
 and 
[test_gcs_to_s3.py](https://github.com/apache/airflow/blob/main/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to