jason810496 commented on code in PR #59183:
URL: https://github.com/apache/airflow/pull/59183#discussion_r2610626662
##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -202,3 +206,38 @@ def test_create_assets_notifies_asset_listener(self,
session):
assert len(asset_listener.created) == 1
assert len(asms) == 1
assert asset_listener.created[0].uri == asset.uri == asms[0].uri
+
+ @pytest.mark.usefixtures("dag_maker", "testing_dag_bundle")
+ def test_get_or_create_apdr_race_condition(self, session):
+ asm = AssetModel(uri="test://asset1/", name="parition_asset",
group="asset")
+ testing_dag = DagModel(dag_id="testing_dag", is_stale=False,
bundle_name="testing")
+ session.add_all([asm, testing_dag])
+ session.commit()
+ session.flush()
+ assert session.query(AssetPartitionDagRun).count() == 0
+
+ def _get_or_create_apdr():
+ if TYPE_CHECKING:
+ assert Session
+ assert Session.session_factory
+
+ _session = Session.session_factory()
+ _session.begin()
+ print(_session.scalar(select(AssetModel)).id)
+ try:
+ return AssetManager._get_or_create_apdr(
+ target_key="test_partition_key",
+ target_dag=testing_dag,
+ asset_id=asm.id,
+ session=_session,
+ ).id
+ finally:
+ _session.commit()
+ _session.close()
+
Review Comment:
The scenario I describe will always be like
| Time | Session 1 | Session 2
|
|------|--------------------------------------|--------------------------------------|
| 0 | Aquire ADPR Lock | SET LOCAL statement_timeout = 1 second
|
| 1 | Create ADPR (not commit yet) | Try Aquire ADPR Lock (not able
to grab the Lock) |
| 2 | sleeping | waiting ... |
| 2 | sleeping | raise Timeout exception |
| 2 | sleeping | |
| 3 | Commit | |
| 4 | | |
so that we could ensure session2 will always be locked by session1 whenever
the test run instead of the current test that we might need to run the test
several time to get the "2. Conflict" execution order as above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]