kaxil edited a comment on issue #13099:
URL: https://github.com/apache/airflow/issues/13099#issuecomment-767531551


   I was able to reproduce this.
   
   > cd airflow_scheduler_test
   > export AIRFLOW_HOME=$(pwd)
   > virtualenv pyenv 
   > source pyenv/bin/activate
   > pip install apache-airflow==2.0.0
   > 
   > airflow db init
   > airflow variables set test TEST
   > mkdir plugins
   > cat << EOF > plugins/test.py
   > from airflow.models.variable import Variable
   > 
   > print(Variable.get('test'))
   > EOF
   > 
   > airflow dags unpause example_bash_operator
   > airflow scheduler
   
   Stacktrace:
   
   <details><summary>CLICK ME</summary>
   <p>
   
   ```
   ❯ airflow scheduler
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   [2021-01-26 13:09:59,150] {scheduler_job.py:1242} INFO - Starting the 
scheduler
   [2021-01-26 13:09:59,151] {scheduler_job.py:1247} INFO - Processing each 
file at most -1 times
   [2021-01-26 13:09:59,154] {dag_processing.py:250} INFO - Launched 
DagFileProcessorManager with pid: 5278
   [2021-01-26 13:09:59,156] {scheduler_job.py:1772} INFO - Resetting orphaned 
tasks for active dag runs
   [2021-01-26 13:09:59,161] {settings.py:52} INFO - Configured default 
timezone Timezone('UTC')
   [2021-01-26 13:09:59,194] {plugins_manager.py:231} ERROR - UNEXPECTED COMMIT 
- THIS WILL BREAK HA LOCKS!
   Traceback (most recent call last):
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/plugins_manager.py",
 line 222, in load_plugins_from_plugin_directory
       loader.exec_module(mod)
     File "<frozen importlib._bootstrap_external>", line 728, in exec_module
     File "<frozen importlib._bootstrap>", line 219, in 
_call_with_frames_removed
     File "/Users/kaxilnaik/airflow/plugins/test.py", line 3, in <module>
       print(Variable.get('test'))
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/models/variable.py",
 line 127, in get
       var_val = Variable.get_variable_from_secrets(key=key)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/models/variable.py",
 line 193, in get_variable_from_secrets
       var_val = secrets_backend.get_variable(key=key)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/utils/session.py",
 line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File "/Users/kaxilnaik/opt/anaconda3/lib/python3.7/contextlib.py", line 
119, in __exit__
       next(self.gen)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/utils/session.py",
 line 32, in create_session
       session.commit()
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
 line 1042, in commit
       self.transaction.commit()
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
 line 504, in commit
       self._prepare_impl()
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
 line 472, in _prepare_impl
       self.session.dispatch.before_commit(self.session)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/event/attr.py",
 line 322, in __call__
       fn(*args, **kw)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/utils/sqlalchemy.py",
 line 217, in _validate_commit
       raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
   RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
   [2021-01-26 13:09:59,201] {plugins_manager.py:232} ERROR - Failed to import 
plugin /Users/kaxilnaik/airflow/plugins/test.py
   [2021-01-26 13:09:59,386] {scheduler_job.py:1623} INFO - DAG 
scenario1_case2_02 is at (or above) max_active_runs (1 of 1), not creating any 
more runs
   [2021-01-26 13:09:59,387] {scheduler_job.py:1623} INFO - DAG 
scenario1_case2_01 is at (or above) max_active_runs (1 of 1), not creating any 
more runs
   [2021-01-26 13:09:59,435] {scheduler_job.py:936} INFO - 3 tasks up for 
execution:
        <TaskInstance: scenario1_case2_02.tasks__1_of_10 2021-01-25 
00:00:00+00:00 [scheduled]>
        <TaskInstance: scenario1_case2_01.tasks__1_of_10 2021-01-25 
00:00:00+00:00 [scheduled]>
        <TaskInstance: scenario4_case2_1_40.tasks__1_of_10 2021-01-25 
00:00:00+00:00 [scheduled]>
   [2021-01-26 13:09:59,436] {scheduler_job.py:970} INFO - Figuring out tasks 
to run in Pool(name=default_pool) with 128 open slots and 3 task instances 
ready to be queued
   [2021-01-26 13:09:59,436] {scheduler_job.py:997} INFO - DAG 
scenario1_case2_02 has 0/1000000 running and queued tasks
   [2021-01-26 13:09:59,436] {scheduler_job.py:997} INFO - DAG 
scenario1_case2_01 has 0/1000000 running and queued tasks
   [2021-01-26 13:09:59,436] {scheduler_job.py:997} INFO - DAG 
scenario4_case2_1_40 has 0/16 running and queued tasks
   [2021-01-26 13:09:59,437] {scheduler_job.py:1058} INFO - Setting the 
following tasks to queued state:
        <TaskInstance: scenario1_case2_02.tasks__1_of_10 2021-01-25 
00:00:00+00:00 [scheduled]>
        <TaskInstance: scenario1_case2_01.tasks__1_of_10 2021-01-25 
00:00:00+00:00 [scheduled]>
        <TaskInstance: scenario4_case2_1_40.tasks__1_of_10 2021-01-25 
00:00:00+00:00 [scheduled]>
   [2021-01-26 13:09:59,438] {scheduler_job.py:1100} INFO - Sending 
TaskInstanceKey(dag_id='scenario1_case2_02', task_id='tasks__1_of_10', 
execution_date=datetime.datetime(2021, 1, 25, 0, 0, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 10 and queue default
   [2021-01-26 13:09:59,439] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'scenario1_case2_02', 'tasks__1_of_10', 
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/Users/kaxilnaik/airflow/dags/scenario1_case2_02.py']
   [2021-01-26 13:09:59,439] {scheduler_job.py:1100} INFO - Sending 
TaskInstanceKey(dag_id='scenario1_case2_01', task_id='tasks__1_of_10', 
execution_date=datetime.datetime(2021, 1, 25, 0, 0, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 10 and queue default
   [2021-01-26 13:09:59,439] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'scenario1_case2_01', 'tasks__1_of_10', 
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/Users/kaxilnaik/airflow/dags/scenario1_case2_01.py']
   [2021-01-26 13:09:59,439] {scheduler_job.py:1100} INFO - Sending 
TaskInstanceKey(dag_id='scenario4_case2_1_40', task_id='tasks__1_of_10', 
execution_date=datetime.datetime(2021, 1, 25, 0, 0, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 10 and queue default
   [2021-01-26 13:09:59,439] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'scenario4_case2_1_40', 'tasks__1_of_10', 
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/Users/kaxilnaik/airflow/dags/scenario4_case2_1.py']
   [2021-01-26 13:09:59,440] {sequential_executor.py:59} INFO - Executing 
command: ['airflow', 'tasks', 'run', 'scenario1_case2_02', 'tasks__1_of_10', 
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/Users/kaxilnaik/airflow/dags/scenario1_case2_02.py']
   [2021-01-26 13:10:00,926] {dagbag.py:440} INFO - Filling up the DagBag from 
/Users/kaxilnaik/airflow/dags/scenario1_case2_02.py
   TEST
   Running <TaskInstance: scenario1_case2_02.tasks__1_of_10 
2021-01-25T00:00:00+00:00 [queued]> on host 
1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa
   [2021-01-26 13:10:01,570] {sequential_executor.py:59} INFO - Executing 
command: ['airflow', 'tasks', 'run', 'scenario1_case2_01', 'tasks__1_of_10', 
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/Users/kaxilnaik/airflow/dags/scenario1_case2_01.py']
   [2021-01-26 13:10:02,992] {dagbag.py:440} INFO - Filling up the DagBag from 
/Users/kaxilnaik/airflow/dags/scenario1_case2_01.py
   TEST
   Running <TaskInstance: scenario1_case2_01.tasks__1_of_10 
2021-01-25T00:00:00+00:00 [queued]> on host 
1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa
   [2021-01-26 13:10:03,628] {sequential_executor.py:59} INFO - Executing 
command: ['airflow', 'tasks', 'run', 'scenario4_case2_1_40', 'tasks__1_of_10', 
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/Users/kaxilnaik/airflow/dags/scenario4_case2_1.py']
   [2021-01-26 13:10:05,054] {dagbag.py:440} INFO - Filling up the DagBag from 
/Users/kaxilnaik/airflow/dags/scenario4_case2_1.py
   TEST
   Running <TaskInstance: scenario4_case2_1_40.tasks__1_of_10 
2021-01-25T00:00:00+00:00 [queued]> on host 
1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa
   [2021-01-26 13:10:05,750] {scheduler_job.py:1201} INFO - Executor reports 
execution of scenario1_case2_02.tasks__1_of_10 execution_date=2021-01-25 
00:00:00+00:00 exited with status success for try_number 1
   [2021-01-26 13:10:05,750] {scheduler_job.py:1201} INFO - Executor reports 
execution of scenario1_case2_01.tasks__1_of_10 execution_date=2021-01-25 
00:00:00+00:00 exited with status success for try_number 1
   [2021-01-26 13:10:05,750] {scheduler_job.py:1201} INFO - Executor reports 
execution of scenario4_case2_1_40.tasks__1_of_10 execution_date=2021-01-25 
00:00:00+00:00 exited with status success for try_number 1
   [2021-01-26 13:10:05,792] {scheduler_job.py:1293} ERROR - Exception when 
executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
 line 1277, in _execute_context
       cursor, statement, parameters, context
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
 line 593, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.UniqueViolation: duplicate key value violates unique 
constraint "dag_run_dag_id_run_id_key"
   DETAIL:  Key (dag_id, run_id)=(scenario1_case2_02, 
scheduled__2021-01-25T00:00:00+00:00) already exists.
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1275, in _execute
       self._run_scheduler_loop()
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1377, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1474, in _do_scheduling
       self._create_dag_runs(query.all(), session)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1582, in _create_dag_runs
       creating_job_id=self.id,
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/utils/session.py",
 line 62, in wrapper
       return func(*args, **kwargs)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/models/dag.py",
 line 1781, in create_dagrun
       session.flush()
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
 line 2523, in flush
       self._flush(objects)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
 line 2664, in _flush
       transaction.rollback(_capture_exception=True)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py",
 line 69, in __exit__
       exc_value, with_traceback=exc_tb,
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py",
 line 182, in raise_
       raise exception
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
 line 2624, in _flush
       flush_context.execute()
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py",
 line 422, in execute
       rec.execute(self)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py",
 line 589, in execute
       uow,
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
 line 245, in save_obj
       insert,
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
 line 1136, in _emit_insert_statements
       statement, params
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
 line 1011, in execute
       return meth(self, multiparams, params)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/sql/elements.py",
 line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
 line 1130, in _execute_clauseelement
       distilled_params,
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
 line 1317, in _execute_context
       e, statement, parameters, cursor, context
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
 line 1511, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py",
 line 182, in raise_
       raise exception
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
 line 1277, in _execute_context
       cursor, statement, parameters, context
     File 
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
 line 593, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate 
key value violates unique constraint "dag_run_dag_id_run_id_key"
   DETAIL:  Key (dag_id, run_id)=(scenario1_case2_02, 
scheduled__2021-01-25T00:00:00+00:00) already exists.
   
   [SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, 
state, run_id, creating_job_id, external_trigger, run_type, conf, 
last_scheduling_decision, dag_hash) VALUES (%(dag_id)s, %(execution_date)s, 
%(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, 
%(external_trigger)s, %(run_type)s, %(conf)s, %(last_scheduling_decision)s, 
%(dag_hash)s) RETURNING dag_run.id]
   [parameters: {'dag_id': 'scenario1_case2_02', 'execution_date': 
datetime.datetime(2021, 1, 25, 0, 0, tzinfo=Timezone('UTC')), 'start_date': 
datetime.datetime(2021, 1, 26, 13, 10, 5, 790285, tzinfo=Timezone('UTC')), 
'end_date': None, 'state': 'running', 'run_id': 
'scheduled__2021-01-25T00:00:00+00:00', 'creating_job_id': 1, 
'external_trigger': False, 'run_type': <DagRunType.SCHEDULED: 'scheduled'>, 
'conf': <psycopg2.extensions.Binary object at 0x7f95c0b11db0>, 
'last_scheduling_decision': None, 'dag_hash': 
'3bb8093d821e43ddfc6ab56cd9eccfd2'}]
   (Background on this error at: http://sqlalche.me/e/13/gkpj)
   [2021-01-26 13:10:06,814] {process_utils.py:100} INFO - Sending 
Signals.SIGTERM to GPID 5278
   [2021-01-26 13:10:07,005] {process_utils.py:206} INFO - Waiting up to 5 
seconds for processes to exit...
   [2021-01-26 13:10:07,041] {process_utils.py:66} INFO - Process 
psutil.Process(pid=5388, status='terminated', started='13:10:05') (5388) 
terminated with exit code None
   [2021-01-26 13:10:07,043] {process_utils.py:66} INFO - Process 
psutil.Process(pid=5385, status='terminated', started='13:10:05') (5385) 
terminated with exit code None
   [2021-01-26 13:10:07,043] {process_utils.py:66} INFO - Process 
psutil.Process(pid=5278, status='terminated', exitcode=0, started='13:09:59') 
(5278) terminated with exit code 0
   [2021-01-26 13:10:07,043] {scheduler_job.py:1296} INFO - Exited execute loop
   ```
   
   </p>
   </details>
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to