maganaluis commented on a change in pull request #8256:
URL: https://github.com/apache/airflow/pull/8256#discussion_r419189015



##########
File path: airflow/operators/python_operator.py
##########
@@ -330,13 +330,28 @@ def _write_string_args(self, filename):
 
     def _write_args(self, input_filename):
         # serialize args to file
+        if self.use_dill:
+            serializer = dill
+        else:
+            serializer = pickle
+        # some args from context can't be loaded in virtual env
+        invalid_args = set(['dag', 'task', 'ti'])
         if self._pass_op_args():
+            kwargs = {}
+            for key, value in self.op_kwargs.items():

Review comment:
       @Fokko Thank you for taking time to review this PR, I've updated it 
based on your suggestions. 
   
   ```python
       def _write_args(self, input_filename):
           # serialize args to file
           if self.use_dill:
               serializer = dill
           else:
               serializer = pickle
           # some items from context can't be loaded in virtual env
           # see pr https://github.com/apache/airflow/pull/8256
           not_serializable = {'dag', 'task', 'ti', 'macros', 'task_instance', 
'var'}
           if self._pass_op_args():
               kwargs = {key: value for key, value in self.op_kwargs.items()
                         if key not in not_serializable}
               with open(input_filename, 'wb') as f:
                   arg_dict = ({'args': self.op_args, 'kwargs': kwargs})
                   serializer.dump(arg_dict, f)
   ````
   
   The invalid arguments here are SQLAlchemy db models, some fail to serialize, 
and some others (dag, task, and ti) fail to deserialize at run time due to 
reference to objects which are no longer present. See log.
   
   [log_1.txt](https://github.com/apache/airflow/files/4572523/log_1.txt)
   
   To solve this issues we would need to implement a __setstate__ and 
__getstate__ internal functions in those objects so they are properly 
serialized. This is not the scope of this PR and does represent considerable 
work, and tests. 
   
   I've also added a pytest to ensure this gets tested in the future. 
   
   ```python
       def test_config_context(self):
           """
           This test ensures we can use dag_run from the context
           to access the configuration at run time that's being
           passed from the UI, CLI, and REST API.
           """
           self.dag.create_dagrun(
               run_id='manual__' + DEFAULT_DATE.isoformat(),
               execution_date=DEFAULT_DATE,
               start_date=DEFAULT_DATE,
               state=State.RUNNING,
               external_trigger=False,
           )
   
           def pass_function(**kwargs):
               kwargs['dag_run'].conf
   
           t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag,
                                        provide_context=True,
                                        python_callable=pass_function)
           t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to