xinbinhuang commented on a change in pull request #17769:
URL: https://github.com/apache/airflow/pull/17769#discussion_r693405701
##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
child_pipe.close()
thread.join(timeout=1.0)
+ @conf_vars({('core', 'load_examples'): 'False'})
+ @mock.patch('airflow.dag_processing.manager.Stats.timing')
+ def test_send_file_processing_statsd_timing(self, statsd_timing_mock,
tmpdir):
+ # arrange
+ filename_to_parse = tmpdir / 'temp_dag.py'
+ # Generate dag
+ dag_code = dedent(
+ """
+ from airflow import DAG
+ dag = DAG(dag_id='temp_dag', schedule_interval='0 0 * * *')
+ """
+ )
+ with open(filename_to_parse, 'w') as file_to_parse:
+ file_to_parse.writelines(dag_code)
+
+ child_pipe, parent_pipe = multiprocessing.Pipe()
+
+ async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+ manager = DagFileProcessorManager(
+ dag_directory=tmpdir,
+ max_runs=1,
+ processor_timeout=timedelta.max,
+ signal_conn=child_pipe,
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=async_mode,
+ )
+
+ # act
Review comment:
```suggestion
```
##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
child_pipe.close()
thread.join(timeout=1.0)
+ @conf_vars({('core', 'load_examples'): 'False'})
+ @mock.patch('airflow.dag_processing.manager.Stats.timing')
+ def test_send_file_processing_statsd_timing(self, statsd_timing_mock,
tmpdir):
+ # arrange
Review comment:
```suggestion
```
##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
child_pipe.close()
thread.join(timeout=1.0)
+ @conf_vars({('core', 'load_examples'): 'False'})
+ @mock.patch('airflow.dag_processing.manager.Stats.timing')
+ def test_send_file_processing_statsd_timing(self, statsd_timing_mock,
tmpdir):
+ # arrange
+ filename_to_parse = tmpdir / 'temp_dag.py'
+ # Generate dag
Review comment:
```suggestion
```
##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
child_pipe.close()
thread.join(timeout=1.0)
+ @conf_vars({('core', 'load_examples'): 'False'})
+ @mock.patch('airflow.dag_processing.manager.Stats.timing')
+ def test_send_file_processing_statsd_timing(self, statsd_timing_mock,
tmpdir):
+ # arrange
+ filename_to_parse = tmpdir / 'temp_dag.py'
+ # Generate dag
+ dag_code = dedent(
+ """
+ from airflow import DAG
+ dag = DAG(dag_id='temp_dag', schedule_interval='0 0 * * *')
+ """
+ )
+ with open(filename_to_parse, 'w') as file_to_parse:
+ file_to_parse.writelines(dag_code)
+
+ child_pipe, parent_pipe = multiprocessing.Pipe()
+
+ async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+ manager = DagFileProcessorManager(
+ dag_directory=tmpdir,
+ max_runs=1,
+ processor_timeout=timedelta.max,
+ signal_conn=child_pipe,
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=async_mode,
+ )
+
+ # act
+ with create_session():
+ self.run_processor_manager_one_loop(manager, parent_pipe)
+
+ child_pipe.close()
+ parent_pipe.close()
+
+ # assert
+ # we check that after processing the file and removing it from
DagFileProcessorManager._processors,
+ # the statistics on the last processing was sent to the statsd
Review comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]