cmsouza opened a new issue #17538:
URL: https://github.com/apache/airflow/issues/17538
**Apache Airflow version**: 2.1.2
**Apache Airflow Provider versions**
apache-airflow-providers-celery==2.0.0
apache-airflow-providers-ftp==2.0.0
apache-airflow-providers-google==4.0.0
apache-airflow-providers-imap==2.0.0
apache-airflow-providers-mysql==2.0.0
apache-airflow-providers-postgres==2.0.0
apache-airflow-providers-sqlite==2.0.0
**What happened**:
When trying to export a parquet file to GCS from MySQL containing dates /
datetimes the following error occurs:
```
[2021-08-09 14:30:36,760] {taskinstance.py:1501} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/opt/airflow/env/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File
"/opt/airflow/env/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File
"/opt/airflow/env/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 1361, in _execute_task
result = task_copy.execute(context=context)
File
"/opt/airflow/env/lib/python3.7/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py",
line 154, in execute
files_to_upload = self._write_local_data_files(cursor)
File
"/opt/airflow/env/lib/python3.7/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py",
line 223, in _write_local_data_files
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
File "pyarrow/table.pxi", line 1576, in pyarrow.lib.Table.from_pydict
File "pyarrow/array.pxi", line 331, in pyarrow.lib.asarray
File "pyarrow/array.pxi", line 305, in pyarrow.lib.array
File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
File "pyarrow/error.pxi", line 122, in
pyarrow.lib.pyarrow_internal_check_status
TypeError: an integer is required (got type str)
```
Which is a result of the following code casting de data to string before
storing the row:
```python
# airflow/providers/google/cloud/transfers/sql_to_gcs.py@210
for row in cursor:
# Convert datetime objects to utc seconds, and decimals to floats.
# Convert binary type object to string encoded with base64.
row = self.convert_types(schema, col_type_dict, row)
# which calls MySQLtoGCSOperator::convert_type
# airflow/providers/google/cloud/transfers/mysql_to_gcs.py@120
if isinstance(value, datetime):
value = str(value)
elif isinstance(value, timedelta):
value = str((datetime.min + value).time())
```
This is fine for string based formats such as CSV/JSON but pyarrow should
receive the unconverted datetime object to be able to store the row correctly
I think the easiest fix is just to not convert the row before passing it to
pyarrow, like this:
```diff
diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
index 091c22f01..85cc3c98f 100644
--- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
@@ -210,12 +210,12 @@ class BaseSQLToGCSOperator(BaseOperator):
for row in cursor:
# Convert datetime objects to utc seconds, and decimals to
floats.
# Convert binary type object to string encoded with base64.
- row = self.convert_types(schema, col_type_dict, row)
+ str_row = self.convert_types(schema, col_type_dict, row)
if self.export_format == 'csv':
if self.null_marker is not None:
- row = [value if value is not None else self.null_marker
for value in row]
- csv_writer.writerow(row)
+ str_row = [value if value is not None else
self.null_marker for value in str_row]
+ csv_writer.writerow(str_row)
elif self.export_format == 'parquet':
if self.null_marker is not None:
row = [value if value is not None else self.null_marker
for value in row]
@@ -223,7 +223,7 @@ class BaseSQLToGCSOperator(BaseOperator):
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
parquet_writer.write_table(tbl)
else:
- row_dict = dict(zip(schema, row))
+ row_dict = dict(zip(schema, str_row))
tmp_file_handle.write(
json.dumps(row_dict, sort_keys=True,
ensure_ascii=False).encode("utf-8")
```
Although I'm not sure if it will cause any regression errors
**What you expected to happen**:
The parquet file should be written to disk and uploaded to GCS
**How to reproduce it**:
The following code is able to reproduce it 100% of the times, it's a
simplified version of the code on the actual Operator:
This fails for the exact same reason the Operator fails:
```python
from datetime import datetime
import pyarrow.parquet as pq
f = open('test.parquet', 'w')
c.execute(a)
parquet_schema = pa.schema([
('id', pa.int64()),
('date_added', pa.timestamp('s')),
])
parquet_writer = pq.ParquetWriter(f.name, parquet_schema)
row_pydic = {
'id': [26417],
'date_added': ['2021-08-10 11:04:06'],
}
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
parquet_writer.write_table(tbl)
parquet_writer.close()
```
And returns the following error:
```
TypeError Traceback (most recent call last)
/tmp/ipykernel_377852/1020037443.py in <module>
17 }
18
---> 19 tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
20 parquet_writer.write_table(tbl)
21
~/export/env/lib/python3.9/site-packages/pyarrow/table.pxi in
pyarrow.lib.Table.from_pydict()
~/export/env/lib/python3.9/site-packages/pyarrow/array.pxi in
pyarrow.lib.asarray()
~/export/env/lib/python3.9/site-packages/pyarrow/array.pxi in
pyarrow.lib.array()
~/export/env/lib/python3.9/site-packages/pyarrow/array.pxi in
pyarrow.lib._sequence_to_array()
~/export/env/lib/python3.9/site-packages/pyarrow/error.pxi in
pyarrow.lib.pyarrow_internal_check_status()
TypeError: an integer is required (got type str)
```
While this works:
```python
from datetime import datetime
import pyarrow.parquet as pq
f = open('test.parquet', 'w')
c.execute(a)
parquet_schema = pa.schema([
('id', pa.int64()),
('date_added', pa.timestamp('s')),
])
parquet_writer = pq.ParquetWriter(f.name, parquet_schema)
row_pydic = {
'id': [26417],
'date_added': [datetime(2021, 8, 10, 11, 4, 6)],
}
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
parquet_writer.write_table(tbl)
parquet_writer.close()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]