SasanAhmadi opened a new issue #16919:
URL: https://github.com/apache/airflow/issues/16919


   
   
   **Apache Airflow version**:
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: aws
   
   **What happened**:
   
   <!-- (please include exact error messages if you can) -->
   
   **What you expected to happen**: when reading data with Mysql_to_s3 
following exception happens:
   [2021-07-10 03:24:04,051] {{mysql_to_s3.py:120}} INFO - Data from MySQL 
obtained
   [2021-07-10 03:24:04,137] {{taskinstance.py:1482}} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib64/python3.7/site-packages/pandas/core/arrays/integer.py", line 
155, in safe_cast
       return values.astype(dtype, casting="safe", copy=copy)
   TypeError: Cannot cast array data from dtype('O') to dtype('int64') 
according to the rule 'safe'
   
   The above exception was the direct cause of the following exception:
   ```
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1138, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1311, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1341, in _execute_task
       result = task_copy.execute(context=context)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/transfers/mysql_to_s3.py",
 line 122, in execute
       self._fix_int_dtypes(data_df)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/transfers/mysql_to_s3.py",
 line 114, in _fix_int_dtypes
       df[col] = df[col].astype(pd.Int64Dtype())
     File "/usr/local/lib64/python3.7/site-packages/pandas/core/generic.py", 
line 5877, in astype
       new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
     File 
"/usr/local/lib64/python3.7/site-packages/pandas/core/internals/managers.py", 
line 631, in astype
       return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
     File 
"/usr/local/lib64/python3.7/site-packages/pandas/core/internals/managers.py", 
line 427, in apply
       applied = getattr(b, f)(**kwargs)
     File 
"/usr/local/lib64/python3.7/site-packages/pandas/core/internals/blocks.py", 
line 673, in astype
       values = astype_nansafe(vals1d, dtype, copy=True)
     File 
"/usr/local/lib64/python3.7/site-packages/pandas/core/dtypes/cast.py", line 
1019, in astype_nansafe
       return dtype.construct_array_type()._from_sequence(arr, dtype=dtype, 
copy=copy)
     File 
"/usr/local/lib64/python3.7/site-packages/pandas/core/arrays/integer.py", line 
363, in _from_sequence
       return integer_array(scalars, dtype=dtype, copy=copy)
     File 
"/usr/local/lib64/python3.7/site-packages/pandas/core/arrays/integer.py", line 
143, in integer_array
       values, mask = coerce_to_array(values, dtype=dtype, copy=copy)
     File 
"/usr/local/lib64/python3.7/site-packages/pandas/core/arrays/integer.py", line 
258, in coerce_to_array
       values = safe_cast(values, dtype, copy=False)
     File 
"/usr/local/lib64/python3.7/site-packages/pandas/core/arrays/integer.py", line 
164, in safe_cast
       ) from err
   TypeError: cannot safely cast non-equivalent object to int64
   ```
   <!-- What do you think went wrong? -->
   
   **How to reproduce it**:
   create a table like the following in mysql database and use mysql_to_s3 to 
load data from this table into s3
   
   ```
   create table test_data(id int, some_decimal decimal(10, 2))
   
   insert into test_data (id, some_decimal) values(1, 99999999.99), (2, null)
   ```
   
   **Anything else we need to know**:
   
   following code is the problem where it is looking for an occurrence of float 
data type in the column datatype name and instead of using the 
```pd.Float64Dtype()``` it uses the ```pd.Int64Dtype()```. since there could be 
floating-point values in the array this will cause the exception for safely 
casting the array to data type.
   
   ```
   def _fix_int_dtypes(self, df: pd.DataFrame) -> None:
           """Mutate DataFrame to set dtypes for int columns containing NaN 
values."""
           for col in df:
               if "float" in df[col].dtype.name and df[col].hasnans:
                   # inspect values to determine if dtype of non-null values is 
int or float
                   notna_series = df[col].dropna().values
                   if np.isclose(notna_series, notna_series.astype(int)).all():
                       # set to dtype that retains integers and supports NaNs
                       df[col] = np.where(df[col].isnull(), None, df[col])
                       df[col] = df[col].astype(pd.Int64Dtype())
   ```
   
   Moreover, I don't know why we use ```isclose``` to inspect if the values 
will be close enough if we cast to integer when we have the option to cast to 
Float64Dtype.
   ```isclose``` here destroys the perception of the data because it is not an 
equal evaluation of the sets to determine if the type is float or int. It will 
approximately check which is the root cause of the exception that follows. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to