kaxil edited a comment on issue #13099:
URL: https://github.com/apache/airflow/issues/13099#issuecomment-767531551
I was able to reproduce this.
> cd airflow_scheduler_test
> export AIRFLOW_HOME=$(pwd)
> virtualenv pyenv
> source pyenv/bin/activate
> pip install apache-airflow==2.0.0
>
> airflow db init
> airflow variables set test TEST
> mkdir plugins
> cat << EOF > plugins/test.py
> from airflow.models.variable import Variable
>
> print(Variable.get('test'))
> EOF
>
> airflow dags unpause example_bash_operator
> airflow scheduler
Stacktrace:
<details><summary>CLICK ME</summary>
<p>
```
❯ airflow scheduler
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2021-01-26 13:09:59,150] {scheduler_job.py:1242} INFO - Starting the
scheduler
[2021-01-26 13:09:59,151] {scheduler_job.py:1247} INFO - Processing each
file at most -1 times
[2021-01-26 13:09:59,154] {dag_processing.py:250} INFO - Launched
DagFileProcessorManager with pid: 5278
[2021-01-26 13:09:59,156] {scheduler_job.py:1772} INFO - Resetting orphaned
tasks for active dag runs
[2021-01-26 13:09:59,161] {settings.py:52} INFO - Configured default
timezone Timezone('UTC')
[2021-01-26 13:09:59,194] {plugins_manager.py:231} ERROR - UNEXPECTED COMMIT
- THIS WILL BREAK HA LOCKS!
Traceback (most recent call last):
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/plugins_manager.py",
line 222, in load_plugins_from_plugin_directory
loader.exec_module(mod)
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in
_call_with_frames_removed
File "/Users/kaxilnaik/airflow/plugins/test.py", line 3, in <module>
print(Variable.get('test'))
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/models/variable.py",
line 127, in get
var_val = Variable.get_variable_from_secrets(key=key)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/models/variable.py",
line 193, in get_variable_from_secrets
var_val = secrets_backend.get_variable(key=key)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/utils/session.py",
line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/kaxilnaik/opt/anaconda3/lib/python3.7/contextlib.py", line
119, in __exit__
next(self.gen)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/utils/session.py",
line 32, in create_session
session.commit()
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 1042, in commit
self.transaction.commit()
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 504, in commit
self._prepare_impl()
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 472, in _prepare_impl
self.session.dispatch.before_commit(self.session)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/event/attr.py",
line 322, in __call__
fn(*args, **kw)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/utils/sqlalchemy.py",
line 217, in _validate_commit
raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
[2021-01-26 13:09:59,201] {plugins_manager.py:232} ERROR - Failed to import
plugin /Users/kaxilnaik/airflow/plugins/test.py
[2021-01-26 13:09:59,386] {scheduler_job.py:1623} INFO - DAG
scenario1_case2_02 is at (or above) max_active_runs (1 of 1), not creating any
more runs
[2021-01-26 13:09:59,387] {scheduler_job.py:1623} INFO - DAG
scenario1_case2_01 is at (or above) max_active_runs (1 of 1), not creating any
more runs
[2021-01-26 13:09:59,435] {scheduler_job.py:936} INFO - 3 tasks up for
execution:
<TaskInstance: scenario1_case2_02.tasks__1_of_10 2021-01-25
00:00:00+00:00 [scheduled]>
<TaskInstance: scenario1_case2_01.tasks__1_of_10 2021-01-25
00:00:00+00:00 [scheduled]>
<TaskInstance: scenario4_case2_1_40.tasks__1_of_10 2021-01-25
00:00:00+00:00 [scheduled]>
[2021-01-26 13:09:59,436] {scheduler_job.py:970} INFO - Figuring out tasks
to run in Pool(name=default_pool) with 128 open slots and 3 task instances
ready to be queued
[2021-01-26 13:09:59,436] {scheduler_job.py:997} INFO - DAG
scenario1_case2_02 has 0/1000000 running and queued tasks
[2021-01-26 13:09:59,436] {scheduler_job.py:997} INFO - DAG
scenario1_case2_01 has 0/1000000 running and queued tasks
[2021-01-26 13:09:59,436] {scheduler_job.py:997} INFO - DAG
scenario4_case2_1_40 has 0/16 running and queued tasks
[2021-01-26 13:09:59,437] {scheduler_job.py:1058} INFO - Setting the
following tasks to queued state:
<TaskInstance: scenario1_case2_02.tasks__1_of_10 2021-01-25
00:00:00+00:00 [scheduled]>
<TaskInstance: scenario1_case2_01.tasks__1_of_10 2021-01-25
00:00:00+00:00 [scheduled]>
<TaskInstance: scenario4_case2_1_40.tasks__1_of_10 2021-01-25
00:00:00+00:00 [scheduled]>
[2021-01-26 13:09:59,438] {scheduler_job.py:1100} INFO - Sending
TaskInstanceKey(dag_id='scenario1_case2_02', task_id='tasks__1_of_10',
execution_date=datetime.datetime(2021, 1, 25, 0, 0, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 10 and queue default
[2021-01-26 13:09:59,439] {base_executor.py:82} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'scenario1_case2_02', 'tasks__1_of_10',
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/Users/kaxilnaik/airflow/dags/scenario1_case2_02.py']
[2021-01-26 13:09:59,439] {scheduler_job.py:1100} INFO - Sending
TaskInstanceKey(dag_id='scenario1_case2_01', task_id='tasks__1_of_10',
execution_date=datetime.datetime(2021, 1, 25, 0, 0, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 10 and queue default
[2021-01-26 13:09:59,439] {base_executor.py:82} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'scenario1_case2_01', 'tasks__1_of_10',
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/Users/kaxilnaik/airflow/dags/scenario1_case2_01.py']
[2021-01-26 13:09:59,439] {scheduler_job.py:1100} INFO - Sending
TaskInstanceKey(dag_id='scenario4_case2_1_40', task_id='tasks__1_of_10',
execution_date=datetime.datetime(2021, 1, 25, 0, 0, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 10 and queue default
[2021-01-26 13:09:59,439] {base_executor.py:82} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'scenario4_case2_1_40', 'tasks__1_of_10',
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/Users/kaxilnaik/airflow/dags/scenario4_case2_1.py']
[2021-01-26 13:09:59,440] {sequential_executor.py:59} INFO - Executing
command: ['airflow', 'tasks', 'run', 'scenario1_case2_02', 'tasks__1_of_10',
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/Users/kaxilnaik/airflow/dags/scenario1_case2_02.py']
[2021-01-26 13:10:00,926] {dagbag.py:440} INFO - Filling up the DagBag from
/Users/kaxilnaik/airflow/dags/scenario1_case2_02.py
TEST
Running <TaskInstance: scenario1_case2_02.tasks__1_of_10
2021-01-25T00:00:00+00:00 [queued]> on host
1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa
[2021-01-26 13:10:01,570] {sequential_executor.py:59} INFO - Executing
command: ['airflow', 'tasks', 'run', 'scenario1_case2_01', 'tasks__1_of_10',
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/Users/kaxilnaik/airflow/dags/scenario1_case2_01.py']
[2021-01-26 13:10:02,992] {dagbag.py:440} INFO - Filling up the DagBag from
/Users/kaxilnaik/airflow/dags/scenario1_case2_01.py
TEST
Running <TaskInstance: scenario1_case2_01.tasks__1_of_10
2021-01-25T00:00:00+00:00 [queued]> on host
1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa
[2021-01-26 13:10:03,628] {sequential_executor.py:59} INFO - Executing
command: ['airflow', 'tasks', 'run', 'scenario4_case2_1_40', 'tasks__1_of_10',
'2021-01-25T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/Users/kaxilnaik/airflow/dags/scenario4_case2_1.py']
[2021-01-26 13:10:05,054] {dagbag.py:440} INFO - Filling up the DagBag from
/Users/kaxilnaik/airflow/dags/scenario4_case2_1.py
TEST
Running <TaskInstance: scenario4_case2_1_40.tasks__1_of_10
2021-01-25T00:00:00+00:00 [queued]> on host
1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa
[2021-01-26 13:10:05,750] {scheduler_job.py:1201} INFO - Executor reports
execution of scenario1_case2_02.tasks__1_of_10 execution_date=2021-01-25
00:00:00+00:00 exited with status success for try_number 1
[2021-01-26 13:10:05,750] {scheduler_job.py:1201} INFO - Executor reports
execution of scenario1_case2_01.tasks__1_of_10 execution_date=2021-01-25
00:00:00+00:00 exited with status success for try_number 1
[2021-01-26 13:10:05,750] {scheduler_job.py:1201} INFO - Executor reports
execution of scenario4_case2_1_40.tasks__1_of_10 execution_date=2021-01-25
00:00:00+00:00 exited with status success for try_number 1
[2021-01-26 13:10:05,792] {scheduler_job.py:1293} ERROR - Exception when
executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1277, in _execute_context
cursor, statement, parameters, context
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
line 593, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique
constraint "dag_run_dag_id_run_id_key"
DETAIL: Key (dag_id, run_id)=(scenario1_case2_02,
scheduled__2021-01-25T00:00:00+00:00) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1275, in _execute
self._run_scheduler_loop()
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1377, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1474, in _do_scheduling
self._create_dag_runs(query.all(), session)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1582, in _create_dag_runs
creating_job_id=self.id,
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/utils/session.py",
line 62, in wrapper
return func(*args, **kwargs)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/airflow/models/dag.py",
line 1781, in create_dagrun
session.flush()
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 2523, in flush
self._flush(objects)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 2664, in _flush
transaction.rollback(_capture_exception=True)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py",
line 69, in __exit__
exc_value, with_traceback=exc_tb,
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py",
line 182, in raise_
raise exception
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 2624, in _flush
flush_context.execute()
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 422, in execute
rec.execute(self)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 589, in execute
uow,
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
line 245, in save_obj
insert,
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
line 1136, in _emit_insert_statements
statement, params
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1011, in execute
return meth(self, multiparams, params)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/sql/elements.py",
line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1130, in _execute_clauseelement
distilled_params,
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1317, in _execute_context
e, statement, parameters, cursor, context
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py",
line 182, in raise_
raise exception
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1277, in _execute_context
cursor, statement, parameters, context
File
"/Users/kaxilnaik/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
line 593, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate
key value violates unique constraint "dag_run_dag_id_run_id_key"
DETAIL: Key (dag_id, run_id)=(scenario1_case2_02,
scheduled__2021-01-25T00:00:00+00:00) already exists.
[SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date,
state, run_id, creating_job_id, external_trigger, run_type, conf,
last_scheduling_decision, dag_hash) VALUES (%(dag_id)s, %(execution_date)s,
%(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s,
%(external_trigger)s, %(run_type)s, %(conf)s, %(last_scheduling_decision)s,
%(dag_hash)s) RETURNING dag_run.id]
[parameters: {'dag_id': 'scenario1_case2_02', 'execution_date':
datetime.datetime(2021, 1, 25, 0, 0, tzinfo=Timezone('UTC')), 'start_date':
datetime.datetime(2021, 1, 26, 13, 10, 5, 790285, tzinfo=Timezone('UTC')),
'end_date': None, 'state': 'running', 'run_id':
'scheduled__2021-01-25T00:00:00+00:00', 'creating_job_id': 1,
'external_trigger': False, 'run_type': <DagRunType.SCHEDULED: 'scheduled'>,
'conf': <psycopg2.extensions.Binary object at 0x7f95c0b11db0>,
'last_scheduling_decision': None, 'dag_hash':
'3bb8093d821e43ddfc6ab56cd9eccfd2'}]
(Background on this error at: http://sqlalche.me/e/13/gkpj)
[2021-01-26 13:10:06,814] {process_utils.py:100} INFO - Sending
Signals.SIGTERM to GPID 5278
[2021-01-26 13:10:07,005] {process_utils.py:206} INFO - Waiting up to 5
seconds for processes to exit...
[2021-01-26 13:10:07,041] {process_utils.py:66} INFO - Process
psutil.Process(pid=5388, status='terminated', started='13:10:05') (5388)
terminated with exit code None
[2021-01-26 13:10:07,043] {process_utils.py:66} INFO - Process
psutil.Process(pid=5385, status='terminated', started='13:10:05') (5385)
terminated with exit code None
[2021-01-26 13:10:07,043] {process_utils.py:66} INFO - Process
psutil.Process(pid=5278, status='terminated', exitcode=0, started='13:09:59')
(5278) terminated with exit code 0
[2021-01-26 13:10:07,043] {scheduler_job.py:1296} INFO - Exited execute loop
```
</p>
</details>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]