eldar-elne opened a new issue, #40637:
URL: https://github.com/apache/airflow/issues/40637

   ### Apache Airflow Provider(s)
   
   amazon
   
   ### Versions of Apache Airflow Providers
   
   ```
   --constraint 
"https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.11.txt";
   
   apache-airflow-providers-slack==8.1.0
   apache-airflow-providers-amazon==8.7.1
   apache-airflow-providers-jdbc==4.0.2
   apache-airflow-providers-datadog==3.3.2
   tableauserverclient==0.25
   apache-airflow-providers-mysql==5.3.1
   apache-airflow-providers-neo4j==3.3.3
   neo4j==5.13.0
   aiobotocore==2.6.0
   ```
   
   ### Apache Airflow version
   
   2.7.2
   
   ### Operating System
   
   MacOS 14.2.1
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   when using the operator `S3FileTransformOperator` and submitting an s3 
select expression, it can only read and write CSV's
   (Not a sure if it's a bug or a feature request- please move if needed)
   
   ### What you think should happen instead
   
   The boto3 client can accept more options such as gzip, bzip and more types 
such as parquet and JSON, so the operator should accept the following params 
too (as they already exist in the s3 hook @ `select_key` method.
   `input_serialization`
   `output_serialization`
   
   ref:
   
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/select_object_content.html#:~:text=CSV%2C%20JSON%2C%20and%20Parquet%20%2D%20Objects%20must%20be%20in%20CSV%2C%20JSON%2C%20or%20Parquet%20format.
   
   ### How to reproduce
   
   **This is not working:**
   ```
       transform_parquet = S3FileTransformOperator(
           task_id='transform_parquet',
           source_s3_key='s3://<bucket>/<prefix>/file.snappy.parquet',
           dest_s3_key='s3://<bucket>/<prefix>/file.json',
           select_expression="SELECT * FROM s3object s LIMIT 5",
           input_serialization={"Parquet": {}}, 
           output_serialization = {"CSV": {}},  
           replace=True
       )
   ```
   
   **This is working:**
   ```
       transform_csv = S3FileTransformOperator(
           task_id='transform_csv',
           source_s3_key='s3://<bucket>/<prefix>/file.csv',
           dest_s3_key='s3://<bucket>/<other_prefix>/file..csv',
           select_expression="SELECT * FROM s3object s LIMIT 5",
           replace=True
       )
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to