sakaia opened a new issue #14161:
URL: https://github.com/apache/airflow/issues/14161
**Apache Airflow version**:
1.10.14
**Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
1.18.5
**Environment**:
- **Cloud provider or hardware configuration**:
- **OS** (e.g. from /etc/os-release):Ubuntu 18.04.5 LTS
- **Kernel** (e.g. `uname -a`):Linux sakaia 4.15.0-128-generic #131-Ubuntu
SMP Wed Dec 9 06:57:35 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
- **Install tools**:
- **Others**: I installed helm chart 7.13.0.
**What happened**:
Cannot use marshmallow 3.10.0 on airflow-worker.
I want to use marshmallow 3.10.0 in airflow-worker.
**What you expected to happen**:
Cannot Import marshmallow 3.10.0, just works 2.21.0 (already on airflow
1.10.14 image)
**How to reproduce it**:
Execute following dag definition file on KubernetesExecutor environment.
**Anything else we need to know**:
I am executing following python code. and log files are follows
```Python
# Airflow marshmallow version update test
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
af_run_dag_default_args = {
'owner': 'sakaia'
}
## Define DAG details
af_run_dag = DAG(
dag_id='ss0',
default_args=af_run_dag_default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=['training']
)
def pythonTest(**kwargs) -> str :
print("pythonTest")
import sys, subprocess
result = subprocess.check_output([sys.executable, '-m', 'pip',
'install', '--user', 'marshmallow==3.10.0'])
print(str(result).replace('\\n', '\n'))
import marshmallow
print("marshmallow version: %s" % marshmallow.__version__,
file=sys.stderr)
print(str("marshmallow version %s" % marshmallow.__version__))
print(str("marshmallow file %s" % marshmallow.__file__))
with
open('/home/airflow/.local/lib/python3.8/site-packages/marshmallow/__init__.py')
as f:
hoge = f.read()
print(hoge)
return "pythonTest"
# Define DAG steps/workflow
with af_run_dag as dag :
python_test = PythonOperator(
task_id='python_test',
python_callable=pythonTest,
op_kwargs={
},
dag=dag
)
```
Output log is follows, it seems marshmallow version update works
successfully, but it keeps version 2.21.0 on python code.
```
[2021-02-05 01:45:36,385] {base_task_runner.py:61} DEBUG - Planning to run
as the user
[2021-02-05 01:45:36,415] {taskinstance.py:685} DEBUG - <TaskInstance:
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Task Instance
State' PASSED: True, Task state queued was valid.
[2021-02-05 01:45:36,415] {taskinstance.py:685} DEBUG - <TaskInstance:
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Not In Retry
Period' PASSED: True, The task instance was not marked for retrying.
[2021-02-05 01:45:36,422] {taskinstance.py:685} DEBUG - <TaskInstance:
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Trigger Rule'
PASSED: True, The task instance did not have any upstream tasks.
[2021-02-05 01:45:36,423] {taskinstance.py:685} DEBUG - <TaskInstance:
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Previous Dagrun
State' PASSED: True, The task did not have depends_on_past set.
[2021-02-05 01:45:36,423] {taskinstance.py:670} INFO - Dependencies all met
for <TaskInstance: ss0.python_test 2020-12-07T00:00:00+00:00 [queued]>
[2021-02-05 01:45:36,442] {taskinstance.py:685} DEBUG - <TaskInstance:
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Pool Slots
Available' PASSED: True, ('There are enough open slots in %s to execute the
task', 'default_pool')
[2021-02-05 01:45:36,443] {taskinstance.py:685} DEBUG - <TaskInstance:
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Task
Concurrency' PASSED: True, Task concurrency is not set.
[2021-02-05 01:45:36,443] {taskinstance.py:685} DEBUG - <TaskInstance:
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Not In Retry
Period' PASSED: True, The task instance was not marked for retrying.
[2021-02-05 01:45:36,443] {taskinstance.py:685} DEBUG - <TaskInstance:
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Previous Dagrun
State' PASSED: True, The task did not have depends_on_past set.
[2021-02-05 01:45:36,468] {taskinstance.py:685} DEBUG - <TaskInstance:
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Trigger Rule'
PASSED: True, The task instance did not have any upstream tasks.
[2021-02-05 01:45:36,472] {taskinstance.py:670} INFO - Dependencies all met
for <TaskInstance: ss0.python_test 2020-12-07T00:00:00+00:00 [queued]>
[2021-02-05 01:45:36,472] {taskinstance.py:880} INFO -
--------------------------------------------------------------------------------
[2021-02-05 01:45:36,472] {taskinstance.py:881} INFO - Starting attempt 1 of
1
[2021-02-05 01:45:36,472] {taskinstance.py:882} INFO -
--------------------------------------------------------------------------------
[2021-02-05 01:45:36,506] {taskinstance.py:901} INFO - Executing
<Task(PythonOperator): python_test> on 2020-12-07T00:00:00+00:00
[2021-02-05 01:45:36,512] {standard_task_runner.py:54} INFO - Started
process 70 to run task
[2021-02-05 01:45:36,562] {standard_task_runner.py:77} INFO - Running:
['airflow', 'run', 'ss0', 'python_test', '2020-12-07T00:00:00+00:00',
'--job_id', '87', '--pool', 'default_pool', '--raw', '-sd',
'DAGS_FOLDER/ss0.py', '--cfg_path', '/tmp/tmp0wif1hiv']
[2021-02-05 01:45:36,562] {standard_task_runner.py:78} INFO - Job 87:
Subtask python_test
[2021-02-05 01:45:36,563] {cli_action_loggers.py:68} DEBUG - Calling
callbacks: [<function default_action_log at 0x7f166678ab80>]
[2021-02-05 01:45:36,587] {settings.py:233} DEBUG - Setting up DB connection
pool (PID 70)
[2021-02-05 01:45:36,588] {settings.py:267} DEBUG -
settings.prepare_engine_args(): Using NullPool
[2021-02-05 01:45:36,652] {logging_mixin.py:112} INFO - Running
<TaskInstance: ss0.python_test 2020-12-07T00:00:00+00:00 [running]> on host
ss0pythontest-ec131a89bec94531956e82eb98c543c1
[2021-02-05 01:45:36,726] {__init__.py:101} DEBUG - Preparing lineage inlets
and outlets
[2021-02-05 01:45:36,726] {__init__.py:137} DEBUG - inlets: [], outlets: []
[2021-02-05 01:45:36,727] {python_operator.py:103} DEBUG - Exporting the
following env vars:
AIRFLOW_CTX_DAG_OWNER=sakaia
AIRFLOW_CTX_DAG_ID=ss0
AIRFLOW_CTX_TASK_ID=python_test
AIRFLOW_CTX_EXECUTION_DATE=2020-12-07T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill_2020-12-07T00:00:00+00:00
[2021-02-05 01:45:36,727] {logging_mixin.py:112} INFO - pythonTest
[2021-02-05 01:45:40,690] {logging_mixin.py:112} INFO - b'Collecting
marshmallow==3.10.0
Downloading marshmallow-3.10.0-py2.py3-none-any.whl (46 kB)
Installing collected packages: marshmallow
Attempting uninstall: marshmallow
Found existing installation: marshmallow 2.21.0
Uninstalling marshmallow-2.21.0:
Successfully uninstalled marshmallow-2.21.0
Successfully installed marshmallow-3.10.0
'
[2021-02-05 01:45:40,692] {logging_mixin.py:112} WARNING - marshmallow
version: 2.21.0
[2021-02-05 01:45:40,692] {logging_mixin.py:112} INFO - marshmallow version
2.21.0
[2021-02-05 01:45:40,692] {logging_mixin.py:112} INFO - marshmallow file
/home/airflow/.local/lib/python3.8/site-packages/marshmallow/__init__.py
[2021-02-05 01:45:40,693] {logging_mixin.py:112} INFO - from
marshmallow.schema import Schema, SchemaOpts
from . import fields
from marshmallow.decorators import (
pre_dump,
post_dump,
pre_load,
post_load,
validates,
validates_schema,
)
from marshmallow.utils import EXCLUDE, INCLUDE, RAISE, pprint, missing
from marshmallow.exceptions import ValidationError
from distutils.version import LooseVersion
__version__ = "3.10.0"
__version_info__ = tuple(LooseVersion(__version__).version)
__all__ = [
"EXCLUDE",
"INCLUDE",
"RAISE",
"Schema",
"SchemaOpts",
"fields",
"validates",
"validates_schema",
"pre_dump",
"post_dump",
"pre_load",
"post_load",
"pprint",
"ValidationError",
"missing",
]
[2021-02-05 01:45:40,693] {logging_mixin.py:112} INFO -
[2021-02-05 01:45:40,693] {python_operator.py:114} INFO - Done. Returned
value was: pythonTest
[2021-02-05 01:45:40,736] {__init__.py:61} DEBUG - Backend: None, Lineage
called with inlets: [], outlets: []
[2021-02-05 01:45:40,766] {taskinstance.py:1057} INFO - Marking task as
SUCCESS.dag_id=ss0, task_id=python_test, execution_date=20201207T000000,
start_date=20210205T014536, end_date=20210205T014540
[2021-02-05 01:45:41,396] {base_job.py:197} DEBUG - [heartbeat]
[2021-02-05 01:45:41,396] {local_task_job.py:102} INFO - Task exited with
return code 0
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]