jedcunningham opened a new issue #17326:
URL: https://github.com/apache/airflow/issues/17326


   **Apache Airflow version**: main
   
   **Environment**: CI
   
   **What happened**:
   
   The `TestLocalTaskJob.test_mark_success_no_kill` no longer passes on MSSQL. 
I initially thought it was a race condition, but even after 5 minutes the TI 
wasn't running.
   
   
https://github.com/apache/airflow/blob/36bdfe8d0ef7e5fc428434f8313cf390ee9acc8f/tests/jobs/test_local_task_job.py#L301-L306
   
   I've tracked down that the issue was introduced with #16301 (cc 
@ephraimbuddy), but I haven't really dug into why.
   
   **How to reproduce it**:
   
   `./breeze --backend mssql tests tests/jobs/test_local_task_job.py`
   
   
   ```
   
_____________________________________________________________________________________
 TestLocalTaskJob.test_mark_success_no_kill 
_____________________________________________________________________________________
                                                                                
                                                                                
                                                         
   self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at 
0x7f54652abf10>                          
                                                                                
                                                                                
                                                         
       def test_mark_success_no_kill(self):                                     
                              
           """                                                                  
                                                                                
                                                         
           Test that ensures that mark_success in the UI doesn't cause          
                                                                                
                                                         
           the task to fail, and that the task exits                            
                                                                                
                                                         
           """                                                                  
                                                                                
                                                         
           dagbag = DagBag(                                                     
                                                                                
                                                         
               dag_folder=TEST_DAG_FOLDER,                                      
                                                                                
                                                         
               include_examples=False,                                          
                                                                                
                                                         
           )                                                                    
                              
           dag = dagbag.dags.get('test_mark_success')                           
                                                                                
                                                         
           task = dag.get_task('task1')                                         
                                                                                
                                                         
                                                                                
                                                                                
                                                         
           session = settings.Session()                                         
                                                                                
                                                         
                                                                                
                              
           dag.clear()                                                          
                                                                                
                                                         
           dag.create_dagrun(                                                   
                                                                                
                                                         
               run_id="test",                                                   
                                                                                
                                                         
               state=State.RUNNING,                                             
                                                                                
                                                         
               execution_date=DEFAULT_DATE,                                     
                                                                                
                                                         
               start_date=DEFAULT_DATE,                                         
                                                                                
                                                         
               session=session,                                                 
                                                                                
                                                         
           )                                                                    
                              
           ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)            
                                                                                
                                                         
           ti.refresh_from_db()                                                 
                                                                                
                                                         
           job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True)          
                                                                                
                                                         
           process = multiprocessing.Process(target=job1.run)                   
                                                                                
                                                         
           process.start()                                                      
                                                                                
                                                         
           for _ in range(0, 50):                                               
                                                                                
                                                         
               if ti.state == State.RUNNING:                                    
                                                                                
                                                         
                   break                                                        
                              
               time.sleep(0.1)                                                  
                                                                                
                                                         
               ti.refresh_from_db()                                             
                                                                                
                                                         
   >       assert State.RUNNING == ti.state                                     
                                                                                
                                                         
   E       AssertionError: assert <TaskInstanceState.RUNNING: 'running'> == 
None                                                                            
                                                             
   E        +  where <TaskInstanceState.RUNNING: 'running'> = State.RUNNING     
                              
   E        +  and   None = <TaskInstance: test_mark_success.task1 2016-01-01 
00:00:00+00:00 [None]>.state                                                    
                                                           
                                                                                
                                                                                
                                                         
   tests/jobs/test_local_task_job.py:306: AssertionError                        
                                                                                
                                                         
   
------------------------------------------------------------------------------------------------
 Captured stderr call 
------------------------------------------------------------------------------------------------
   INFO  [airflow.models.dagbag.DagBag] Filling up the DagBag from 
/opt/airflow/tests/dags                                                         
                                                                      
   INFO  [root] class_instance type: <class 
'unusual_prefix_5d280a9b385120fec3c40cfe5be04e2f41b6b5e8_test_task_view_type_check.CallableClass'>
   INFO  [airflow.models.dagbag.DagBag] File 
/opt/airflow/tests/dags/test_zip.zip:file_no_airflow_dag.py assumed to contain 
no DAGs. Skipping.                                                              
             
   Process Process-1:                                                           
                              
   Traceback (most recent call last):                                           
                                                                                
                                                         
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 2336, in _wrap_pool_connect 
       return fn()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 364, in connect
       return _ConnectionFairy._checkout(self)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 809, in _checkout
       result = pool._dialect.do_ping(fairy.connection)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 
575, in do_ping
       cursor.execute(self._dialect_specific_select_one) 
   pyodbc.ProgrammingError: ('42000', '[42000] [Microsoft][ODBC Driver 17 for 
SQL Server][SQL Server]The server failed to resume the transaction. 
Desc:3400000012. (3971) (SQLExecDirectW)')
   
   (truncated)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to