sakaia opened a new issue #14161:
URL: https://github.com/apache/airflow/issues/14161


   **Apache Airflow version**:
   
   1.10.14
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   1.18.5
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**:
   - **OS** (e.g. from /etc/os-release):Ubuntu 18.04.5 LTS
   - **Kernel** (e.g. `uname -a`):Linux sakaia 4.15.0-128-generic #131-Ubuntu 
SMP Wed Dec 9 06:57:35 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
   - **Install tools**:
   - **Others**: I installed helm chart 7.13.0.
   
   **What happened**:
   
   Cannot use marshmallow 3.10.0 on airflow-worker.
   I want to use marshmallow 3.10.0 in airflow-worker.
   
   **What you expected to happen**:
   
   Cannot Import marshmallow 3.10.0, just works 2.21.0 (already on airflow 
1.10.14 image)
   
   **How to reproduce it**:
   
   Execute following dag definition file on KubernetesExecutor environment.
   
   **Anything else we need to know**:
   
   I am executing following python code. and log files are follows
   
   ```Python
   # Airflow marshmallow version update test
   
   from airflow.utils.dates import days_ago
   from airflow.models import DAG
   from airflow.operators.python_operator import PythonOperator
   
   af_run_dag_default_args = {
       'owner': 'sakaia'
   }
   
   ## Define DAG details
   af_run_dag = DAG(
       dag_id='ss0',
       default_args=af_run_dag_default_args,
       schedule_interval=None,
       start_date=days_ago(2),
       tags=['training']
   )
   
   def pythonTest(**kwargs) -> str :
       print("pythonTest")
       import sys, subprocess
       result = subprocess.check_output([sys.executable, '-m', 'pip', 
'install', '--user', 'marshmallow==3.10.0'])
       print(str(result).replace('\\n', '\n'))
   
       import marshmallow
       print("marshmallow version: %s" % marshmallow.__version__, 
file=sys.stderr)
       print(str("marshmallow version %s" % marshmallow.__version__))
       print(str("marshmallow file    %s" % marshmallow.__file__))
       with 
open('/home/airflow/.local/lib/python3.8/site-packages/marshmallow/__init__.py')
 as f:
          hoge = f.read()
       print(hoge)
       return "pythonTest"
   
   # Define DAG steps/workflow
   with af_run_dag as dag :
       python_test = PythonOperator(
            task_id='python_test',
            python_callable=pythonTest,
            op_kwargs={
            },
            dag=dag
       )
   ```
   Output log is follows, it seems marshmallow version update works 
successfully, but it keeps version 2.21.0 on python code.
   
   
   ```
   [2021-02-05 01:45:36,385] {base_task_runner.py:61} DEBUG - Planning to run 
as the  user
   [2021-02-05 01:45:36,415] {taskinstance.py:685} DEBUG - <TaskInstance: 
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Task Instance 
State' PASSED: True, Task state queued was valid.
   [2021-02-05 01:45:36,415] {taskinstance.py:685} DEBUG - <TaskInstance: 
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Not In Retry 
Period' PASSED: True, The task instance was not marked for retrying.
   [2021-02-05 01:45:36,422] {taskinstance.py:685} DEBUG - <TaskInstance: 
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Trigger Rule' 
PASSED: True, The task instance did not have any upstream tasks.
   [2021-02-05 01:45:36,423] {taskinstance.py:685} DEBUG - <TaskInstance: 
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Previous Dagrun 
State' PASSED: True, The task did not have depends_on_past set.
   [2021-02-05 01:45:36,423] {taskinstance.py:670} INFO - Dependencies all met 
for <TaskInstance: ss0.python_test 2020-12-07T00:00:00+00:00 [queued]>
   [2021-02-05 01:45:36,442] {taskinstance.py:685} DEBUG - <TaskInstance: 
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Pool Slots 
Available' PASSED: True, ('There are enough open slots in %s to execute the 
task', 'default_pool')
   [2021-02-05 01:45:36,443] {taskinstance.py:685} DEBUG - <TaskInstance: 
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Task 
Concurrency' PASSED: True, Task concurrency is not set.
   [2021-02-05 01:45:36,443] {taskinstance.py:685} DEBUG - <TaskInstance: 
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Not In Retry 
Period' PASSED: True, The task instance was not marked for retrying.
   [2021-02-05 01:45:36,443] {taskinstance.py:685} DEBUG - <TaskInstance: 
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Previous Dagrun 
State' PASSED: True, The task did not have depends_on_past set.
   [2021-02-05 01:45:36,468] {taskinstance.py:685} DEBUG - <TaskInstance: 
ss0.python_test 2020-12-07T00:00:00+00:00 [queued]> dependency 'Trigger Rule' 
PASSED: True, The task instance did not have any upstream tasks.
   [2021-02-05 01:45:36,472] {taskinstance.py:670} INFO - Dependencies all met 
for <TaskInstance: ss0.python_test 2020-12-07T00:00:00+00:00 [queued]>
   [2021-02-05 01:45:36,472] {taskinstance.py:880} INFO -
   
--------------------------------------------------------------------------------
   [2021-02-05 01:45:36,472] {taskinstance.py:881} INFO - Starting attempt 1 of 
1
   [2021-02-05 01:45:36,472] {taskinstance.py:882} INFO -
   
--------------------------------------------------------------------------------
   [2021-02-05 01:45:36,506] {taskinstance.py:901} INFO - Executing 
<Task(PythonOperator): python_test> on 2020-12-07T00:00:00+00:00
   [2021-02-05 01:45:36,512] {standard_task_runner.py:54} INFO - Started 
process 70 to run task
   [2021-02-05 01:45:36,562] {standard_task_runner.py:77} INFO - Running: 
['airflow', 'run', 'ss0', 'python_test', '2020-12-07T00:00:00+00:00', 
'--job_id', '87', '--pool', 'default_pool', '--raw', '-sd', 
'DAGS_FOLDER/ss0.py', '--cfg_path', '/tmp/tmp0wif1hiv']
   [2021-02-05 01:45:36,562] {standard_task_runner.py:78} INFO - Job 87: 
Subtask python_test
   [2021-02-05 01:45:36,563] {cli_action_loggers.py:68} DEBUG - Calling 
callbacks: [<function default_action_log at 0x7f166678ab80>]
   [2021-02-05 01:45:36,587] {settings.py:233} DEBUG - Setting up DB connection 
pool (PID 70)
   [2021-02-05 01:45:36,588] {settings.py:267} DEBUG - 
settings.prepare_engine_args(): Using NullPool
   [2021-02-05 01:45:36,652] {logging_mixin.py:112} INFO - Running 
<TaskInstance: ss0.python_test 2020-12-07T00:00:00+00:00 [running]> on host 
ss0pythontest-ec131a89bec94531956e82eb98c543c1
   [2021-02-05 01:45:36,726] {__init__.py:101} DEBUG - Preparing lineage inlets 
and outlets
   [2021-02-05 01:45:36,726] {__init__.py:137} DEBUG - inlets: [], outlets: []
   [2021-02-05 01:45:36,727] {python_operator.py:103} DEBUG - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=sakaia
   AIRFLOW_CTX_DAG_ID=ss0
   AIRFLOW_CTX_TASK_ID=python_test
   AIRFLOW_CTX_EXECUTION_DATE=2020-12-07T00:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=backfill_2020-12-07T00:00:00+00:00
   [2021-02-05 01:45:36,727] {logging_mixin.py:112} INFO - pythonTest
   [2021-02-05 01:45:40,690] {logging_mixin.py:112} INFO - b'Collecting 
marshmallow==3.10.0
     Downloading marshmallow-3.10.0-py2.py3-none-any.whl (46 kB)
   Installing collected packages: marshmallow
     Attempting uninstall: marshmallow
       Found existing installation: marshmallow 2.21.0
       Uninstalling marshmallow-2.21.0:
         Successfully uninstalled marshmallow-2.21.0
   Successfully installed marshmallow-3.10.0
   '
   [2021-02-05 01:45:40,692] {logging_mixin.py:112} WARNING - marshmallow 
version: 2.21.0
   [2021-02-05 01:45:40,692] {logging_mixin.py:112} INFO - marshmallow version 
2.21.0
   [2021-02-05 01:45:40,692] {logging_mixin.py:112} INFO - marshmallow file    
/home/airflow/.local/lib/python3.8/site-packages/marshmallow/__init__.py
   [2021-02-05 01:45:40,693] {logging_mixin.py:112} INFO - from 
marshmallow.schema import Schema, SchemaOpts
   
   from . import fields
   from marshmallow.decorators import (
       pre_dump,
       post_dump,
       pre_load,
       post_load,
       validates,
       validates_schema,
   )
   from marshmallow.utils import EXCLUDE, INCLUDE, RAISE, pprint, missing
   from marshmallow.exceptions import ValidationError
   from distutils.version import LooseVersion
   
   __version__ = "3.10.0"
   __version_info__ = tuple(LooseVersion(__version__).version)
   __all__ = [
       "EXCLUDE",
       "INCLUDE",
       "RAISE",
       "Schema",
       "SchemaOpts",
       "fields",
       "validates",
       "validates_schema",
       "pre_dump",
       "post_dump",
       "pre_load",
       "post_load",
       "pprint",
       "ValidationError",
       "missing",
   ]
   [2021-02-05 01:45:40,693] {logging_mixin.py:112} INFO -
   [2021-02-05 01:45:40,693] {python_operator.py:114} INFO - Done. Returned 
value was: pythonTest
   [2021-02-05 01:45:40,736] {__init__.py:61} DEBUG - Backend: None, Lineage 
called with inlets: [], outlets: []
   [2021-02-05 01:45:40,766] {taskinstance.py:1057} INFO - Marking task as 
SUCCESS.dag_id=ss0, task_id=python_test, execution_date=20201207T000000, 
start_date=20210205T014536, end_date=20210205T014540
   [2021-02-05 01:45:41,396] {base_job.py:197} DEBUG - [heartbeat]
   [2021-02-05 01:45:41,396] {local_task_job.py:102} INFO - Task exited with 
return code 0
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to