Yohei Onishi created AIRFLOW-3568:
-------------------------------------

             Summary: S3ToGoogleCloudStorageOperator failed to copy files from 
s3
                 Key: AIRFLOW-3568
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3568
             Project: Apache Airflow
          Issue Type: Bug
          Components: contrib
    Affects Versions: 1.10.0
            Reporter: Yohei Onishi


I tried to copy files from s3 to gcs using 
S3ToGoogleCloudStorageOperator. The file successfully was uploaded to GCS but 
the task failed with the following error.
{code:java}
[2018-12-26 07:56:33,062] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3 [2018-12-26 07:56:33,062] {discovery.py:871} INFO - URL 
being requested: POST 
https://www.googleapis.com/upload/storage/v1/b/stg-rfid-etl-tmp/o?name=rfid_wh%2Fuq%2Fjp%2Fno_resp_carton_1D%2F2018%2F12%2F24%2F21%2Fno_resp_carton_20181224210201.csv&alt=json&uploadType=media
[2018-12-26 07:56:33,214] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3 [2018-12-26 07:56:33,213] {s3_to_gcs_operator.py:177} 
INFO - All done, uploaded 1 files to Google Cloud Storage
[2018-12-26 07:56:33,217] {models.py:1736} ERROR - Object of type 'set' is not 
JSON serializable
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models.py", line 1637, in _run_raw_tas
    self.xcom_push(key=XCOM_RETURN_KEY, value=result
  File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_pus
    execution_date=execution_date or self.execution_date
  File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrappe
    return func(*args, **kwargs
  File "/usr/local/lib/airflow/airflow/models.py", line 4531, in se
    value = json.dumps(value).encode('UTF-8'
  File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dump
    return _default_encoder.encode(obj
  File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encod
    chunks = self.iterencode(o, _one_shot=True
  File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencod
    return _iterencode(o, 0
  File "/usr/local/lib/python3.6/json/encoder.py", line 180, in defaul
    o.__class__.__name__
TypeError: Object of type 'set' is not JSON serializabl
[2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3 [2018-12-26 07:56:33,217] {models.py:1736} ERROR - 
Object of type 'set' is not JSON serializable
[2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3 Traceback (most recent call last):
[2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", line 
1637, in _run_raw_task
[2018-12-26 07:56:33,220] {models.py:1756} INFO - Marking task as UP_FOR_RETRY
[2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3     self.xcom_push(key=XCOM_RETURN_KEY, value=result)
[2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", line 
1983, in xcom_push
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3     execution_date=execution_date or self.execution_date)
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/utils/db.py", 
line 74, in wrapper
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3     return func(*args, **kwargs)
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", line 
4531, in set
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3     value = json.dumps(value).encode('UTF-8')
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/__init__.py", line 
231, in dumps
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3     return _default_encoder.encode(obj)
[2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", line 
199, in encode
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3     chunks = self.iterencode(o, _one_shot=True)
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", line 
257, in iterencode
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3     return _iterencode(o, 0)
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", line 
180, in default
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3     o.__class__.__name__)
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3 TypeError: Object of type 'set' is not JSON serializable
[2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
gcs_copy_files_from_s3 [2018-12-26 07:56:33,220] {models.py:1756} INFO - 
Marking task as UP_FOR_RETRY
{code}
 
According the error log, it failed because it tries to push return files list 
as set to xcom but xcom_push does not support set.
[https://github.com/apache/incubator-airflow/blob/1.10.0/airflow/models.py#L1637]
{code:java}
# If the task returns a result, push an XCom containing it
if result is not None:
    self.xcom_push(key=XCOM_RETURN_KEY, value=result){code}
[https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L155]

[https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L198]
{code:java}
files = set(files) - set(existing_files)
...
return files{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to