vchiapaikeo commented on issue #21537:
URL: https://github.com/apache/airflow/issues/21537#issuecomment-1367362201

   Hey @eladkal, I took a quick look at this. It seems pretty tricky and I 
think it requires a bit more clarification on how we want to proceed. 
   
   Currently supported export options are csv, parquet, and json. If we were to 
add this capability for just the parquet export type, we might be able to use a 
method native to pyarrow like 
[write_to_dataset()](https://arrow.apache.org/docs/python/parquet.html#writing-to-partitioned-datasets).
 However, we'd likely want to use write_to_dataset's remote filesystem option 
rather than what we do now which is manually writing to local until a file size 
is reached, yielding to the iterator, and then performing the upload of that 
single file via GCS hook. Also, the parameter would need to be very specific, 
like `partition_cols_for_parquet` or something. This poses risks with the local 
filesystem filling up or potential OOM.
   
   I think the better approach would be to try to support this option for all 
export types - csv, parquet, AND json. Doing this would allow us to keep much 
of the implementation the same (i.e., iterate through query cursor, assemble 
single file until it is a certain size, upload that single file to GCS, 
iterate). However, this would require that the query return results to us 
pre-sorted. For example, if we were partitioning by dag_id, we'd need the query 
to look like this:
   
   ```
       test_postgres_to_gcs = PostgresToGCSOperator(
           task_id="test_postgres_to_gcs",
           postgres_conn_id="postgres_default",
           sql="""
           SELECT
               fileloc_hash,  # bigint
               last_updated,  # timestampz
               data_compressed,  # bytea
               data,  # json
               dag_id  # varchar
           FROM serialized_dag
           ORDER BY dag_id
           """,
           export_format="parquet",
           gcp_conn_id="google_cloud_default",
           bucket="my-bucket",
           filename="vchiapaikeo/sql-to-gcs/file.parquet",
           stringify_dict=True,
       )
   ```
   
   If multiple partition columns like dag_id, task_id for example, then the 
order by would look something like this:
   
   ```sql
   ORDER BY dag_id, task_id
   ```
   
   That way, we could write the data out to a single file as we are doing now 
and simply modify the GCS object prefix to include the partition structure 
depending on the value of the partition. It also makes for a simpler method 
signature (simply `partition_cols` instead of something like 
`partition_cols_for_parquet`). And most importantly, we avoid the likelihood of 
OOM / local disk filling up.
   
   What do you think about this approach?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to