Fokko commented on a change in pull request #8256:
URL: https://github.com/apache/airflow/pull/8256#discussion_r415256343
##########
File path: airflow/operators/python_operator.py
##########
@@ -330,13 +330,28 @@ def _write_string_args(self, filename):
def _write_args(self, input_filename):
# serialize args to file
+ if self.use_dill:
+ serializer = dill
+ else:
+ serializer = pickle
+ # some args from context can't be loaded in virtual env
+ invalid_args = set(['dag', 'task', 'ti'])
if self._pass_op_args():
+ kwargs = {}
+ for key, value in self.op_kwargs.items():
+ try:
+ serializer.loads(serializer.dumps(value))
+ if key not in invalid_args:
Review comment:
Why skip the invalid arguments here? It would make more sense to me to
not serialize the set above at all.
##########
File path: airflow/operators/python_operator.py
##########
@@ -330,13 +330,28 @@ def _write_string_args(self, filename):
def _write_args(self, input_filename):
# serialize args to file
+ if self.use_dill:
+ serializer = dill
+ else:
+ serializer = pickle
+ # some args from context can't be loaded in virtual env
+ invalid_args = set(['dag', 'task', 'ti'])
if self._pass_op_args():
+ kwargs = {}
+ for key, value in self.op_kwargs.items():
+ try:
+ serializer.loads(serializer.dumps(value))
+ if key not in invalid_args:
+ kwargs[key] = value
+ except Exception as e:
+ msg = """
+ "Exception %s found while serializing argument
+ object: %s on op_kwargs key %s ...skipping..."
+ """ % (e, value, key)
+ self.log.debug(msg)
Review comment:
I would raise this level. If you try to pass something, and it being
dropped silently, that would be very confusing. What do you think? Any idea how
often this happens?
##########
File path: airflow/operators/python_operator.py
##########
@@ -330,13 +330,28 @@ def _write_string_args(self, filename):
def _write_args(self, input_filename):
# serialize args to file
+ if self.use_dill:
+ serializer = dill
+ else:
+ serializer = pickle
+ # some args from context can't be loaded in virtual env
+ invalid_args = set(['dag', 'task', 'ti'])
if self._pass_op_args():
+ kwargs = {}
+ for key, value in self.op_kwargs.items():
+ try:
+ serializer.loads(serializer.dumps(value))
+ if key not in invalid_args:
+ kwargs[key] = value
+ except Exception as e:
Review comment:
`Exception` is a bit broad, can we be more specific here?
##########
File path: airflow/operators/python_operator.py
##########
@@ -330,13 +330,28 @@ def _write_string_args(self, filename):
def _write_args(self, input_filename):
# serialize args to file
+ if self.use_dill:
+ serializer = dill
+ else:
+ serializer = pickle
+ # some args from context can't be loaded in virtual env
+ invalid_args = set(['dag', 'task', 'ti'])
if self._pass_op_args():
+ kwargs = {}
+ for key, value in self.op_kwargs.items():
Review comment:
More from a code point of view, but I would do something like:
```python
MacBook-Pro-van-Fokko:home-analytics fokkodriesprong$ python2
Python 2.7.17 (default, Dec 23 2019, 21:25:33)
[GCC 4.2.1 Compatible Apple LLVM 11.0.0 (clang-1100.0.33.16)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> invalid_args = {'dag', 'task', 'ti'}
>>> op_kwargs = {
... 'ti': 'Ignored',
... 'foo': 'bar'
... }
>>>
>>> def serialize(self, item):
... try:
... return serializer.loads(serializer.dumps(value))
... except Exception as e:
... msg = """
... "Exception %s found while serializing argument
... object: %s on op_kwargs key %s ...skipping..."
... """ % (e, value, key)
... self.log.debug(msg)
...
>>> # Replace it with something simple for the sake of showing how this
works.
>>> def serialize(item):
... return item
...
>>> kwargs = {key: serialize(item) for key, item in op_kwargs.items() if key
not in invalid_args}
>>>
>>> kwargs
{'foo': 'bar'}
```
This makes it also easier to test the serialize function.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]