sanxore opened a new pull request, #25691:
URL: https://github.com/apache/airflow/pull/25691

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   - Issue description:
   In `PostgresToGCSOperator` operator we can't write DATE column when 
`export_format="parquet"` because at `convert_type` method the date data result 
from postgres are converted to string so pyarrow raise this exception : 
`pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted 
to int` 
   
   - Solution:
   Don't convert date to string when export format is parquet.
   
   
   - Here my solution and screen of DagRun:
   
   ```python
   import datetime
   import json
   import os
   import time
   from decimal import Decimal
   
   import pendulum
   from airflow import DAG
   from airflow.providers.google.cloud.transfers.postgres_to_gcs import 
PostgresToGCSOperator
   from common_v2.settings import GCS_BUCKET
   
   DEFAULT_ARGS = {
       "start_date": datetime.datetime(2022, 8, 12),
       "retries": 1,
       "retry_delay": datetime.timedelta(minutes=1),
   }
   
   
   class PostgresToGCSFixedOperator(PostgresToGCSOperator):
       def __init__(self, **kwargs):
           super().__init__(**kwargs)
   
       def convert_type(self, value, schema_type, stringify_dict=True):
           """
           Takes a value from Postgres, and converts it to a value that's safe 
for
           JSON/Google Cloud Storage/BigQuery.
           Timezone aware Datetime are converted to UTC seconds.
           Unaware Datetime, Date and Time are converted to ISO formatted 
strings.
           Decimals are converted to floats.
   
           :param value: Postgres column value.
           :param schema_type: BigQuery data type.
           :param stringify_dict: Specify whether to convert dict to string.
           """
           if isinstance(value, datetime.datetime):
               iso_format_value = value.isoformat()
               if value.tzinfo is None:
                   return iso_format_value
               return pendulum.parse(iso_format_value).float_timestamp
   
           if self.export_format != "parquet":
               if isinstance(value, datetime.date):
                   return value.isoformat()
               if isinstance(value, datetime.time):
                   formatted_time = time.strptime(str(value), "%H:%M:%S")
                   time_delta = datetime.timedelta(
                       hours=formatted_time.tm_hour, 
minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
                   )
                   return str(time_delta)
   
           if stringify_dict and isinstance(value, dict):
               return json.dumps(value)
           if isinstance(value, Decimal):
               return float(value)
           return value
   
   
   with DAG(
       "date_parquet_issue",
       default_args=DEFAULT_ARGS,
       schedule_interval=None,
   ) as dag:
       date_pg_to_gcs_issue = PostgresToGCSOperator(
           task_id=f"date_pg_to_gcs_issue",
           postgres_conn_id="postgres",
           sql="SELECT CURRENT_DATE;",
           bucket=GCS_BUCKET,
           filename=os.path.join("data/date_pg_to_gcs_issue", 
"part_{}.parquet"),
           export_format="parquet",
       )
   
       date_pg_to_gcs_fixed = PostgresToGCSFixedOperator(
           task_id=f"date_pg_to_gcs_fixed",
           postgres_conn_id="postgres",
           sql="SELECT CURRENT_DATE;",
           bucket=GCS_BUCKET,
           filename=os.path.join("data/date_pg_to_gcs_fixed", 
"part_{}.parquet"),
           export_format="parquet",
       )
   ```
   
![image](https://user-images.githubusercontent.com/14028677/184374167-033771c6-40a0-4e9e-8863-63e471117533.png)
   
   - Fixed Operator write data to Parquet with the right format (physical_type: 
INT32, and logical_type: Date):
   
![image](https://user-images.githubusercontent.com/14028677/184374885-59fef294-0cd3-43b3-b4d2-faed17f86cbe.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to