xinbinhuang commented on a change in pull request #17769:
URL: https://github.com/apache/airflow/pull/17769#discussion_r693405701



##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
             child_pipe.close()
             thread.join(timeout=1.0)
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @mock.patch('airflow.dag_processing.manager.Stats.timing')
+    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, 
tmpdir):
+        # arrange
+        filename_to_parse = tmpdir / 'temp_dag.py'
+        # Generate dag
+        dag_code = dedent(
+            """
+        from airflow import DAG
+        dag = DAG(dag_id='temp_dag', schedule_interval='0 0 * * *')
+        """
+        )
+        with open(filename_to_parse, 'w') as file_to_parse:
+            file_to_parse.writelines(dag_code)
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+        manager = DagFileProcessorManager(
+            dag_directory=tmpdir,
+            max_runs=1,
+            processor_timeout=timedelta.max,
+            signal_conn=child_pipe,
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=async_mode,
+        )
+
+        # act

Review comment:
       ```suggestion
   ```

##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
             child_pipe.close()
             thread.join(timeout=1.0)
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @mock.patch('airflow.dag_processing.manager.Stats.timing')
+    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, 
tmpdir):
+        # arrange

Review comment:
       ```suggestion
   ```

##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
             child_pipe.close()
             thread.join(timeout=1.0)
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @mock.patch('airflow.dag_processing.manager.Stats.timing')
+    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, 
tmpdir):
+        # arrange
+        filename_to_parse = tmpdir / 'temp_dag.py'
+        # Generate dag

Review comment:
       ```suggestion
   ```

##########
File path: tests/dag_processing/test_manager.py
##########
@@ -694,6 +695,48 @@ def fake_processor_(*args, **kwargs):
             child_pipe.close()
             thread.join(timeout=1.0)
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @mock.patch('airflow.dag_processing.manager.Stats.timing')
+    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, 
tmpdir):
+        # arrange
+        filename_to_parse = tmpdir / 'temp_dag.py'
+        # Generate dag
+        dag_code = dedent(
+            """
+        from airflow import DAG
+        dag = DAG(dag_id='temp_dag', schedule_interval='0 0 * * *')
+        """
+        )
+        with open(filename_to_parse, 'w') as file_to_parse:
+            file_to_parse.writelines(dag_code)
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+        manager = DagFileProcessorManager(
+            dag_directory=tmpdir,
+            max_runs=1,
+            processor_timeout=timedelta.max,
+            signal_conn=child_pipe,
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=async_mode,
+        )
+
+        # act
+        with create_session():
+            self.run_processor_manager_one_loop(manager, parent_pipe)
+
+        child_pipe.close()
+        parent_pipe.close()
+
+        # assert
+        # we check that after processing the file and removing it from 
DagFileProcessorManager._processors,
+        # the statistics on the last processing was sent to the statsd

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to