Kache commented on issue #33167:
URL: https://github.com/apache/airflow/issues/33167#issuecomment-1820042580
I had trouble with this too when trying out using creating a custom `@task`
decorator for my custom operator.
I want to keep my code within the same project, but it seems the feature
expects you to create a standalone package.
Reading through Airflow source, I found the following to work:
```python
# file: plugins/operators/my_runner_operator.py
class MyRunnerOperator(BaseOperator):
def __init__(self, *, command = [], env = {}, **kwargs):
super().__init__(**kwargs)
self.command = command
self.env = env
def execute(self, context: Context):
# kinda like DockerOperator or run an ECS Task
class MyRunnerDecoratedOperator(DecoratedOperator, MyRunnerOperator):
custom_operator_name: str = "@task.runner"
def runner_task(
python_callable: Callable | None = None,
multiple_outputs: bool | None = None,
**kwargs,
) -> TaskDecorator:
return task_decorator_factory(
python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=MyRunnerDecoratedOperator,
**kwargs,
)
# HACK
pm = ProvidersManager()
pm.initialize_providers_list()
provider_info = {
'package-name': 'myproj-airflow-providers',
'name': 'My Proj',
'description': "a description",
'task-decorators': [
{
'name': 'runner',
'class-name': 'operators.my_runner_operator.runner_task',
},
],
}
pm._provider_schema_validator.validate(provider_info)
pm._provider_dict['myproj-airflow-providers'] = ProviderInfo('0.0.1',
provider_info, "package")
pm._provider_dict = dict(sorted(pm._provider_dict.items()))
```
This really isn't ideal, and the usage doesn't yet involve the wrapped
function at all:
```python
@task.runner(command=['my_exec', 'arg1', 'arg2'])
def do_something(): pass
```
I'm going to play around with it some more and see if I can get the XCom
args working so that incoming args can be used in `command` within the function
instead of in the decorator, and output of runner can be returned from the
function into XCom.
But if not, it's going to be better to just use a factory function for my
custom operator:
```python
def runner(*cmd, **other_conveniences):
args, kwargs = process(*cmd, **other_conveniences) # e.g. auto generate
task_id
return MyRunnerOperator(*args, **kwargs)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]