dinigo edited a comment on issue #8804:
URL: https://github.com/apache/airflow/issues/8804#issuecomment-626625086


   You are suggesting to implement kind of what we have with 
[GenericTransfer](https://github.com/apache/airflow/blob/master/airflow/operators/generic_transfer.py).
 I've thought of this too. We would need to have a common API for getting and 
sending files. Preferably a `flile-like-object` so transfers are done with a 
stream and they don't take storage.
   
   I would rather have an `GenericFileTransfer`
   
   Having `FsApiHook` abstract implementing functions similar to 
[`DbApiHook`](https://github.com/apache/airflow/blob/master/airflow/hooks/dbapi_hook.py#L46)
 with the `get_records` and `insert_rows` but for files. For example 
`get_file_stream` and `write_file_stream`. 
   
   So for example having a `GCSHook` and `S3Hook` extend `FsApiHook`
   ```python
   class GCSHook (GcpBaseHook, FsApiHook):
     # ...
     def get_file_stream(bucket: str, file_path: str):
       return gcsfs.open(file_path)
     # ...
   ```
   ```python
   class S3Hook (AwsBaseHook, FsApi):
     # ...
     def write_file_stream(bucket: str, file_path: str):
       """ Already implemented in 
https://github.com/apache/airflow/blob/master/airflow/providers/amazon/aws/hooks/s3.py#L582
 """
       client.upload_fileobj(file_obj, bucket_name, key, ExtraArgs=extra_args)
     # ...
    ```
   Having a `GenericFileTransfer` similar to what `GenericTransfer` does in the 
[`execute`](https://github.com/apache/airflow/blob/master/airflow/operators/generic_transfer.py#L67)
 method:
   ```
   class GenericFileTransfer(BaseOperator):
     # ...
     def execute() 
           source_hook = BaseHook.get_hook(self.source_conn_id)
           exchange_stream = source_hook.get_file_stream(self.source_bucket, 
self.source_file)
           dest_hook = BaseHook.get_hook(self.dest_conn_id)
           dest_hook.write_file_stream(self.dest_bucket, self.dest_file)
   ```
   
   Now I can configure the Hooks for each, source and destination, such as:
   ```python
   gcs_to_s3 = GenericFileTransfer(
     source_conn='gcs-conn-id',
     source_hook=GCSHook,
     source_bucket='my-gcs-bucket',
     source_file='my-file{{ ds }}.csv',
     dest_conn='s3-conn-id',
     dest_hook=S3Hook,
     dest_bucket='my-s3-bucket',
     dest_file='my-file{{ ds }}.csv'
   )
   ```
   
   And now you can remove all the specific copy operators like `S3ToSFTP`, 
`AzureBlobbToGCS`... Or, if they don't exist you don't need to implement them 
anymore!
   
   What do you think @turbaszek? I don't know how the Core development is 
organized (I know there's a Jira and a mailing list, but know nothing of the 
organization processes). This will be a core change. But if Airflow aims to 
thrive, it's necessary (IMHO)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to