Yohei Onishi created AIRFLOW-3568:
-------------------------------------
Summary: S3ToGoogleCloudStorageOperator failed to copy files from
s3
Key: AIRFLOW-3568
URL: https://issues.apache.org/jira/browse/AIRFLOW-3568
Project: Apache Airflow
Issue Type: Bug
Components: contrib
Affects Versions: 1.10.0
Reporter: Yohei Onishi
I tried to copy files from s3 to gcs using
S3ToGoogleCloudStorageOperator. The file successfully was uploaded to GCS but
the task failed with the following error.
{code:java}
[2018-12-26 07:56:33,062] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 [2018-12-26 07:56:33,062] {discovery.py:871} INFO - URL
being requested: POST
https://www.googleapis.com/upload/storage/v1/b/stg-rfid-etl-tmp/o?name=rfid_wh%2Fuq%2Fjp%2Fno_resp_carton_1D%2F2018%2F12%2F24%2F21%2Fno_resp_carton_20181224210201.csv&alt=json&uploadType=media
[2018-12-26 07:56:33,214] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 [2018-12-26 07:56:33,213] {s3_to_gcs_operator.py:177}
INFO - All done, uploaded 1 files to Google Cloud Storage
[2018-12-26 07:56:33,217] {models.py:1736} ERROR - Object of type 'set' is not
JSON serializable
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models.py", line 1637, in _run_raw_tas
self.xcom_push(key=XCOM_RETURN_KEY, value=result
File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_pus
execution_date=execution_date or self.execution_date
File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrappe
return func(*args, **kwargs
File "/usr/local/lib/airflow/airflow/models.py", line 4531, in se
value = json.dumps(value).encode('UTF-8'
File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dump
return _default_encoder.encode(obj
File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encod
chunks = self.iterencode(o, _one_shot=True
File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencod
return _iterencode(o, 0
File "/usr/local/lib/python3.6/json/encoder.py", line 180, in defaul
o.__class__.__name__
TypeError: Object of type 'set' is not JSON serializabl
[2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 [2018-12-26 07:56:33,217] {models.py:1736} ERROR -
Object of type 'set' is not JSON serializable
[2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 Traceback (most recent call last):
[2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/models.py", line
1637, in _run_raw_task
[2018-12-26 07:56:33,220] {models.py:1756} INFO - Marking task as UP_FOR_RETRY
[2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 self.xcom_push(key=XCOM_RETURN_KEY, value=result)
[2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/models.py", line
1983, in xcom_push
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 execution_date=execution_date or self.execution_date)
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/utils/db.py",
line 74, in wrapper
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 return func(*args, **kwargs)
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/models.py", line
4531, in set
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 value = json.dumps(value).encode('UTF-8')
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/__init__.py", line
231, in dumps
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 return _default_encoder.encode(obj)
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/encoder.py", line
199, in encode
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 chunks = self.iterencode(o, _one_shot=True)
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/encoder.py", line
257, in iterencode
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 return _iterencode(o, 0)
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/encoder.py", line
180, in default
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 o.__class__.__name__)
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 TypeError: Object of type 'set' is not JSON serializable
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask
gcs_copy_files_from_s3 [2018-12-26 07:56:33,220] {models.py:1756} INFO -
Marking task as UP_FOR_RETRY
{code}
According the error log, it failed because it tries to push return files list
as set to xcom but xcom_push does not support set.
[https://github.com/apache/incubator-airflow/blob/1.10.0/airflow/models.py#L1637]
{code:java}
# If the task returns a result, push an XCom containing it
if result is not None:
self.xcom_push(key=XCOM_RETURN_KEY, value=result){code}
[https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L155]
[https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L198]
{code:java}
files = set(files) - set(existing_files)
...
return files{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)