vchiapaikeo commented on issue #21537: URL: https://github.com/apache/airflow/issues/21537#issuecomment-1367362201
Hey @eladkal, I took a quick look at this. It seems pretty tricky and I think it requires a bit more clarification on how we want to proceed. Currently supported export options are csv, parquet, and json. If we were to add this capability for just the parquet export type, we might be able to use a method native to pyarrow like [write_to_dataset()](https://arrow.apache.org/docs/python/parquet.html#writing-to-partitioned-datasets). However, we'd likely want to use write_to_dataset's remote filesystem option rather than what we do now which is manually writing to local until a file size is reached, yielding to the iterator, and then performing the upload of that single file via GCS hook. Also, the parameter would need to be very specific, like `partition_cols_for_parquet` or something. This poses risks with the local filesystem filling up or potential OOM. I think the better approach would be to try to support this option for all export types - csv, parquet, AND json. Doing this would allow us to keep much of the implementation the same (i.e., iterate through query cursor, assemble single file until it is a certain size, upload that single file to GCS, iterate). However, this would require that the query return results to us pre-sorted. For example, if we were partitioning by dag_id, we'd need the query to look like this: ``` test_postgres_to_gcs = PostgresToGCSOperator( task_id="test_postgres_to_gcs", postgres_conn_id="postgres_default", sql=""" SELECT fileloc_hash, # bigint last_updated, # timestampz data_compressed, # bytea data, # json dag_id # varchar FROM serialized_dag ORDER BY dag_id """, export_format="parquet", gcp_conn_id="google_cloud_default", bucket="my-bucket", filename="vchiapaikeo/sql-to-gcs/file.parquet", stringify_dict=True, ) ``` If multiple partition columns like dag_id, task_id for example, then the order by would look something like this: ```sql ORDER BY dag_id, task_id ``` That way, we could write the data out to a single file as we are doing now and simply modify the GCS object prefix to include the partition structure depending on the value of the partition. It also makes for a simpler method signature (simply `partition_cols` instead of something like `partition_cols_for_parquet`). And most importantly, we avoid the likelihood of OOM / local disk filling up. What do you think about this approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
