sti0 opened a new issue #13487:
URL: https://github.com/apache/airflow/issues/13487


   I'm running Airflow 2.0.0 with docker image `apache/airflow:2.0.0-python3.8` 
from dockerhub.
   
   I tried this example docker DAG:
   
https://github.com/apache/airflow/blob/master/airflow/providers/docker/example_dags/example_docker.py
   
   I changed the task configuration to:
   ```
   t3 = DockerOperator(
       # api_version="1.19",
       docker_url="tcp://myhost:2376",  # Set your docker URL
       command="/bin/sleep 30",
       image="alpine:latest",
       network_mode="bridge",
       task_id="docker_op_tester",
       tls_ca_cert="/path/to/ca.pem",
       tls_client_cert="/path/to/cert.pem",
       tls_client_key="/path/to/key.pem",
       tls_hostname="myhost",
       dag=dag,
   )
   ```
   
   API versions below 1.21 are not supported anymore, because of this this is 
commented.
   
   If I run the DAG the container starts and sleeps for 30 seconds. But then 
the error appeared:
   
   ```
   [2021-01-05 12:32:27,514] {xcom.py:237} ERROR - Could not serialize the XCom 
value into JSON. If you are using pickles instead of JSON for XCom, then you 
need to enable pickle support for XCom in your airflow config.
   ```
   
   If I add `do_xcom_push = False` to the config above, it works as expected. 
But in that case there is no chance to get xcom output. I even tried `echo 
hello` instead of the `sleep` command with the same result.
   
   Full log:
   ```
   *** Reading local file: 
/opt/airflow/logs/docker_sample/docker_op_tester/2021-01-05T12:31:53.033985+00:00/1.log
   [2021-01-05 12:31:56,689] {taskinstance.py:826} INFO - Dependencies all met 
for <TaskInstance: docker_sample.docker_op_tester 
2021-01-05T12:31:53.033985+00:00 [queued]>
   [2021-01-05 12:31:56,699] {taskinstance.py:826} INFO - Dependencies all met 
for <TaskInstance: docker_sample.docker_op_tester 
2021-01-05T12:31:53.033985+00:00 [queued]>
   [2021-01-05 12:31:56,699] {taskinstance.py:1017} INFO - 
   
--------------------------------------------------------------------------------
   [2021-01-05 12:31:56,699] {taskinstance.py:1018} INFO - Starting attempt 1 
of 2
   [2021-01-05 12:31:56,699] {taskinstance.py:1019} INFO - 
   
--------------------------------------------------------------------------------
   [2021-01-05 12:31:56,705] {taskinstance.py:1038} INFO - Executing 
<Task(DockerOperator): docker_op_tester> on 2021-01-05T12:31:53.033985+00:00
   [2021-01-05 12:31:56,709] {standard_task_runner.py:51} INFO - Started 
process 72747 to run task
   [2021-01-05 12:31:56,712] {standard_task_runner.py:75} INFO - Running: 
['airflow', 'tasks', 'run', 'docker_sample', 'docker_op_tester', 
'2021-01-05T12:31:53.033985+00:00', '--job-id', '61', '--pool', 'default_pool', 
'--raw', '--subdir', 'DAGS_FOLDER/mydocker.py', '--cfg-path', 
'/tmp/tmppdqnibne']
   [2021-01-05 12:31:56,713] {standard_task_runner.py:76} INFO - Job 61: 
Subtask docker_op_tester
   [2021-01-05 12:31:56,745] {logging_mixin.py:103} INFO - Running 
<TaskInstance: docker_sample.docker_op_tester 2021-01-05T12:31:53.033985+00:00 
[running]> on host fdfca86817af
   [2021-01-05 12:31:56,775] {taskinstance.py:1230} INFO - Exporting the 
following env vars:
   [email protected]
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=docker_sample
   AIRFLOW_CTX_TASK_ID=docker_op_tester
   AIRFLOW_CTX_EXECUTION_DATE=2021-01-05T12:31:53.033985+00:00
   AIRFLOW_CTX_DAG_RUN_ID=manual__2021-01-05T12:31:53.033985+00:00
   [2021-01-05 12:31:56,849] {docker.py:224} INFO - Starting docker container 
from image alpine:latest
   [2021-01-05 12:32:27,514] {xcom.py:237} ERROR - Could not serialize the XCom 
value into JSON. If you are using pickles instead of JSON for XCom, then you 
need to enable pickle support for XCom in your airflow config.
   [2021-01-05 12:32:27,515] {taskinstance.py:1396} ERROR - Object of type 
bytes is not JSON serializable
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1086, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1260, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1303, in _execute_task
       self.xcom_push(key=XCOM_RETURN_KEY, value=result)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1827, in xcom_push
       XCom.set(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 62, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 
88, in set
       value = XCom.serialize_value(value)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 
235, in serialize_value
       return json.dumps(value).encode('UTF-8')
     File "/usr/local/lib/python3.8/json/__init__.py", line 231, in dumps
       return _default_encoder.encode(obj)
     File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode
       chunks = self.iterencode(o, _one_shot=True)
     File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode
       return _iterencode(o, 0)
     File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type bytes is not JSON serializable
   [2021-01-05 12:32:27,516] {taskinstance.py:1433} INFO - Marking task as 
UP_FOR_RETRY. dag_id=docker_sample, task_id=docker_op_tester, 
execution_date=20210105T123153, start_date=20210105T123156, 
end_date=20210105T123227
   [2021-01-05 12:32:27,548] {local_task_job.py:118} INFO - Task exited with 
return code 1
   ```
   
   Log with `do_xcom_push=False`:
   ```
   *** Reading local file: 
/opt/airflow/logs/docker_sample/docker_op_tester/2021-01-05T12:37:21.850166+00:00/1.log
   [2021-01-05 12:37:25,451] {taskinstance.py:826} INFO - Dependencies all met 
for <TaskInstance: docker_sample.docker_op_tester 
2021-01-05T12:37:21.850166+00:00 [queued]>
   [2021-01-05 12:37:25,462] {taskinstance.py:826} INFO - Dependencies all met 
for <TaskInstance: docker_sample.docker_op_tester 
2021-01-05T12:37:21.850166+00:00 [queued]>
   [2021-01-05 12:37:25,463] {taskinstance.py:1017} INFO - 
   
--------------------------------------------------------------------------------
   [2021-01-05 12:37:25,463] {taskinstance.py:1018} INFO - Starting attempt 1 
of 2
   [2021-01-05 12:37:25,463] {taskinstance.py:1019} INFO - 
   
--------------------------------------------------------------------------------
   [2021-01-05 12:37:25,468] {taskinstance.py:1038} INFO - Executing 
<Task(DockerOperator): docker_op_tester> on 2021-01-05T12:37:21.850166+00:00
   [2021-01-05 12:37:25,471] {standard_task_runner.py:51} INFO - Started 
process 76866 to run task
   [2021-01-05 12:37:25,475] {standard_task_runner.py:75} INFO - Running: 
['airflow', 'tasks', 'run', 'docker_sample', 'docker_op_tester', 
'2021-01-05T12:37:21.850166+00:00', '--job-id', '65', '--pool', 'default_pool', 
'--raw', '--subdir', 'DAGS_FOLDER/mydocker.py', '--cfg-path', 
'/tmp/tmpusz_sweq']
   [2021-01-05 12:37:25,476] {standard_task_runner.py:76} INFO - Job 65: 
Subtask docker_op_tester
   [2021-01-05 12:37:25,508] {logging_mixin.py:103} INFO - Running 
<TaskInstance: docker_sample.docker_op_tester 2021-01-05T12:37:21.850166+00:00 
[running]> on host fdfca86817af
   [2021-01-05 12:37:25,539] {taskinstance.py:1230} INFO - Exporting the 
following env vars:
   [email protected]
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=docker_sample
   AIRFLOW_CTX_TASK_ID=docker_op_tester
   AIRFLOW_CTX_EXECUTION_DATE=2021-01-05T12:37:21.850166+00:00
   AIRFLOW_CTX_DAG_RUN_ID=manual__2021-01-05T12:37:21.850166+00:00
   [2021-01-05 12:37:25,614] {docker.py:224} INFO - Starting docker container 
from image alpine:latest
   [2021-01-05 12:37:56,306] {taskinstance.py:1135} INFO - Marking task as 
SUCCESS. dag_id=docker_sample, task_id=docker_op_tester, 
execution_date=20210105T123721, start_date=20210105T123725, 
end_date=20210105T123756
   [2021-01-05 12:37:56,326] {taskinstance.py:1195} INFO - 1 downstream tasks 
scheduled from follow-on schedule check
   [2021-01-05 12:37:56,344] {local_task_job.py:118} INFO - Task exited with 
return code 0
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to