[
https://issues.apache.org/jira/browse/AIRFLOW-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Ghen updated AIRFLOW-1632:
----------------------------------
Description:
For tables in MySQL that use a "date" or "datetime" type, a dag that exports
from MySQL to Google Cloud Storage and then loads from GCS to BigQuery will
fail when the dates are before 1970.
When the table is exported as JSON to a GCS bucket, dates and datetimes are
converted to timestamps using:
{code}
time.mktime(value.timetuple())
{code}
This creates a problem when you try parse a date that can't be converted to a
UNIX timestamp. For example:
{code}
>>> value = datetime.date(1850,1,1)
>>> time.mktime(value.timetuple())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ValueError: year out of range
{code}
*Steps to reproduce*
0. Set up a MySQL connection and GCP connection in Airflow.
1. Create a MySQL table with a "date" field and put some data into the table.
{code}
CREATE TABLE table_with_date (
date_field date,
datetime_field datetime
);
INSERT INTO table_with_date (date_field, datetime_field) VALUES
('1850-01-01',NOW());
{code}
2. Create a DAG that will export the data from the MySQL to GCS and then load
from GCS to BigQuery (use the schema file). For example:
{code}
extract = MySqlToGoogleCloudStorageOperator(
task_id="extract_table",
mysql_conn_id='mysql_connection',
google_cloud_storage_conn_id='gcp_connection',
sql="SELECT * FROM table_with_date",
bucket='gcs-bucket',
filename='table_with_date.json',
schema_filename='schemas/table_with_date.json',
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id="load_table",
bigquery_conn_id='gcp_connection',
google_cloud_storage_conn_id='gcp_connection',
bucket='gcs-bucket',
destination_project_dataset_table="dataset.table_with_date",
source_objects=['table_with_date.json'],
schema_object='schemas/table_with_date.json',
source_format='NEWLINE_DELIMITED_JSON',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
dag=dag)
load.set_upstream(extract)
{code}
3. Run the DAG
Expected: The DAG runs successfully.
Actual: The `extract_table` task fails with error:
{code}
...
ERROR - year out of range
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File
"/usr/lib/python2.7/site-packages/airflow/contrib/operators/mysql_to_gcs.py",
line 91, in execute
files_to_upload = self._write_local_data_files(cursor)
File
"/usr/lib/python2.7/site-packages/airflow/contrib/operators/mysql_to_gcs.py",
line 132, in _write_local_data_files
row = map(self.convert_types, row)
File
"/usr/lib/python2.7/site-packages/airflow/contrib/operators/mysql_to_gcs.py",
line 196, in convert_types
return time.mktime(value.timetuple())
ValueError: year out of range
...
{code}
*Comments:*
This is really a problem with Python not being able to handle years before like
1850. Bigquery timestamp seems to be able to take years all the way to year
0000. From,
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp-type:
{quote}
0001-01-01 00:00:00 to 9999-12-31 23:59:59.999999 UTC.
{quote}
was:
For tables in MySQL that use a "date" or "datetime" type, a dag that exports
from MySQL to Google Cloud Storage and then loads from GCS to BigQuery will
fail when the dates are before 1970.
When the table is exported as JSON to a GCS bucket, dates and datetimes are
converted to timestamps using:
{code}
time.mktime(value.timetuple())
{code}
This creates a problem when you try parse a date that can't be converted to a
UNIX timestamp. For example:
{code}
>>> value = datetime.date(1850,1,1)
>>> time.mktime(value.timetuple())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ValueError: year out of range
{code}
*Steps to reproduce*
0. Set up a MySQL connection and GCP connection in Airflow.
1. Create a MySQL table with a "date" field and put some data into the table.
{code}
CREATE TABLE table_with_date (
date_field date,
datetime_field datetime
);
INSERT INTO table_with_date (date_field, datetime_field) VALUES
('1850-01-01',NOW());
{code}
2. Create a DAG that will export the data from the MySQL to GCS and then load
from GCS to BigQuery (use the schema file). For example:
{code}
extract = MySqlToGoogleCloudStorageOperator(
task_id="extract_table",
mysql_conn_id='mysql_connection',
google_cloud_storage_conn_id='gcp_connection',
sql="SELECT * FROM table_with_date",
bucket='gcs-bucket',
filename='table_with_date.json',
schema_filename='schemas/table_with_date.json',
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id="load_table",
bigquery_conn_id='gcp_connection',
google_cloud_storage_conn_id='gcp_connection',
bucket='gcs-bucket',
destination_project_dataset_table="dataset.table_with_date",
source_objects=['table_with_date.json'],
schema_object='schemas/table_with_date.json',
source_format='NEWLINE_DELIMITED_JSON',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
dag=dag)
load.set_upstream(extract)
{code}
3. Run the DAG
Expected: The DAG runs successfully.
Actual: The `load_table` task fails with error:
{code}
...
{u'reason': u'invalid',
u'message': u'JSON parsing error in row starting at position 0: Could not
convert value to string. Field: date_field; Value: 1504929600.000000',
u'location': u'gs://gcs-bucket/table_with_date.json'
...
{code}
*Comments:*
Seems like this was just a simple oversight in the section of
`airflow/contrib/operators/mysql_to_gcs.py` where the types get converted. In
`convert_types` both `date` and `datetime` types get converted to timestamps
but in `type_map` there is no mapping for `FIELD_TYPE.date`. This small bug
almost turned my team off from using airflow because we had a lot of tables
that didn't flow into BigQuery because we used the `date` type a lot.
> MySQL to GCS fails for date/datetime before ~1850
> -------------------------------------------------
>
> Key: AIRFLOW-1632
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1632
> Project: Apache Airflow
> Issue Type: Bug
> Components: gcp
> Environment: Google Cloud Platform
> Reporter: Michael Ghen
> Assignee: Michael Ghen
> Priority: Minor
> Fix For: 1.9.0
>
>
> For tables in MySQL that use a "date" or "datetime" type, a dag that exports
> from MySQL to Google Cloud Storage and then loads from GCS to BigQuery will
> fail when the dates are before 1970.
> When the table is exported as JSON to a GCS bucket, dates and datetimes are
> converted to timestamps using:
> {code}
> time.mktime(value.timetuple())
> {code}
> This creates a problem when you try parse a date that can't be converted to a
> UNIX timestamp. For example:
> {code}
> >>> value = datetime.date(1850,1,1)
> >>> time.mktime(value.timetuple())
> Traceback (most recent call last):
> File "<stdin>", line 1, in <module>
> ValueError: year out of range
> {code}
> *Steps to reproduce*
> 0. Set up a MySQL connection and GCP connection in Airflow.
> 1. Create a MySQL table with a "date" field and put some data into the table.
> {code}
> CREATE TABLE table_with_date (
> date_field date,
> datetime_field datetime
> );
> INSERT INTO table_with_date (date_field, datetime_field) VALUES
> ('1850-01-01',NOW());
> {code}
> 2. Create a DAG that will export the data from the MySQL to GCS and then load
> from GCS to BigQuery (use the schema file). For example:
> {code}
> extract = MySqlToGoogleCloudStorageOperator(
> task_id="extract_table",
> mysql_conn_id='mysql_connection',
> google_cloud_storage_conn_id='gcp_connection',
> sql="SELECT * FROM table_with_date",
> bucket='gcs-bucket',
> filename='table_with_date.json',
> schema_filename='schemas/table_with_date.json',
> dag=dag)
> load = GoogleCloudStorageToBigQueryOperator(
> task_id="load_table",
> bigquery_conn_id='gcp_connection',
> google_cloud_storage_conn_id='gcp_connection',
> bucket='gcs-bucket',
> destination_project_dataset_table="dataset.table_with_date",
> source_objects=['table_with_date.json'],
> schema_object='schemas/table_with_date.json',
> source_format='NEWLINE_DELIMITED_JSON',
> create_disposition='CREATE_IF_NEEDED',
> write_disposition='WRITE_TRUNCATE',
> dag=dag)
> load.set_upstream(extract)
> {code}
> 3. Run the DAG
> Expected: The DAG runs successfully.
> Actual: The `extract_table` task fails with error:
> {code}
> ...
> ERROR - year out of range
> Traceback (most recent call last):
> File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
> result = task_copy.execute(context=context)
> File
> "/usr/lib/python2.7/site-packages/airflow/contrib/operators/mysql_to_gcs.py",
> line 91, in execute
> files_to_upload = self._write_local_data_files(cursor)
> File
> "/usr/lib/python2.7/site-packages/airflow/contrib/operators/mysql_to_gcs.py",
> line 132, in _write_local_data_files
> row = map(self.convert_types, row)
> File
> "/usr/lib/python2.7/site-packages/airflow/contrib/operators/mysql_to_gcs.py",
> line 196, in convert_types
> return time.mktime(value.timetuple())
> ValueError: year out of range
> ...
> {code}
> *Comments:*
> This is really a problem with Python not being able to handle years before
> like 1850. Bigquery timestamp seems to be able to take years all the way to
> year 0000. From,
> https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp-type:
> {quote}
> 0001-01-01 00:00:00 to 9999-12-31 23:59:59.999999 UTC.
> {quote}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)