sanxore opened a new pull request, #25691: URL: https://github.com/apache/airflow/pull/25691
<!-- Thank you for contributing! Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping committers for the review! In case of an existing issue, reference it using one of the following: closes: #ISSUE related: #ISSUE How to write a good git commit message: http://chris.beams.io/posts/git-commit/ --> - Issue description: In `PostgresToGCSOperator` operator we can't write DATE column when `export_format="parquet"` because at `convert_type` method the date data result from postgres are converted to string so pyarrow raise this exception : `pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int` - Solution: Don't convert date to string when export format is parquet. - Here my solution and screen of DagRun: ```python import datetime import json import os import time from decimal import Decimal import pendulum from airflow import DAG from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator from common_v2.settings import GCS_BUCKET DEFAULT_ARGS = { "start_date": datetime.datetime(2022, 8, 12), "retries": 1, "retry_delay": datetime.timedelta(minutes=1), } class PostgresToGCSFixedOperator(PostgresToGCSOperator): def __init__(self, **kwargs): super().__init__(**kwargs) def convert_type(self, value, schema_type, stringify_dict=True): """ Takes a value from Postgres, and converts it to a value that's safe for JSON/Google Cloud Storage/BigQuery. Timezone aware Datetime are converted to UTC seconds. Unaware Datetime, Date and Time are converted to ISO formatted strings. Decimals are converted to floats. :param value: Postgres column value. :param schema_type: BigQuery data type. :param stringify_dict: Specify whether to convert dict to string. """ if isinstance(value, datetime.datetime): iso_format_value = value.isoformat() if value.tzinfo is None: return iso_format_value return pendulum.parse(iso_format_value).float_timestamp if self.export_format != "parquet": if isinstance(value, datetime.date): return value.isoformat() if isinstance(value, datetime.time): formatted_time = time.strptime(str(value), "%H:%M:%S") time_delta = datetime.timedelta( hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec ) return str(time_delta) if stringify_dict and isinstance(value, dict): return json.dumps(value) if isinstance(value, Decimal): return float(value) return value with DAG( "date_parquet_issue", default_args=DEFAULT_ARGS, schedule_interval=None, ) as dag: date_pg_to_gcs_issue = PostgresToGCSOperator( task_id=f"date_pg_to_gcs_issue", postgres_conn_id="postgres", sql="SELECT CURRENT_DATE;", bucket=GCS_BUCKET, filename=os.path.join("data/date_pg_to_gcs_issue", "part_{}.parquet"), export_format="parquet", ) date_pg_to_gcs_fixed = PostgresToGCSFixedOperator( task_id=f"date_pg_to_gcs_fixed", postgres_conn_id="postgres", sql="SELECT CURRENT_DATE;", bucket=GCS_BUCKET, filename=os.path.join("data/date_pg_to_gcs_fixed", "part_{}.parquet"), export_format="parquet", ) ```  - Fixed Operator write data to Parquet with the right format (physical_type: INT32, and logical_type: Date):  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
