Kache commented on issue #33167:
URL: https://github.com/apache/airflow/issues/33167#issuecomment-1820042580

   I had trouble with this too when trying out using creating a custom `@task` 
decorator for my custom operator.
   
   I want to keep my code within the same project, but it seems the feature 
expects you to create a standalone package.
   
   Reading through Airflow source, I found the following to work:
   
   ```python
   # file: plugins/operators/my_runner_operator.py
   class MyRunnerOperator(BaseOperator):
       def __init__(self, *, command = [], env = {}, **kwargs):
           super().__init__(**kwargs)
   
           self.command = command
           self.env = env
   
       def execute(self, context: Context):
           # kinda like DockerOperator or run an ECS Task
   
   
   class MyRunnerDecoratedOperator(DecoratedOperator, MyRunnerOperator):
       custom_operator_name: str = "@task.runner"
   
   
   def runner_task(
       python_callable: Callable | None = None,
       multiple_outputs: bool | None = None,
       **kwargs,
   ) -> TaskDecorator:
       return task_decorator_factory(
           python_callable,
           multiple_outputs=multiple_outputs,
           decorated_operator_class=MyRunnerDecoratedOperator,
           **kwargs,
       )
   
   
   # HACK
   pm = ProvidersManager()
   pm.initialize_providers_list()
   provider_info = {
       'package-name': 'myproj-airflow-providers',
       'name': 'My Proj',
       'description': "a description",
       'task-decorators': [
           {
               'name': 'runner',
               'class-name': 'operators.my_runner_operator.runner_task',
           },
       ],
   }
   pm._provider_schema_validator.validate(provider_info)
   pm._provider_dict['myproj-airflow-providers'] = ProviderInfo('0.0.1', 
provider_info, "package")
   pm._provider_dict = dict(sorted(pm._provider_dict.items()))
   ```
   
   This really isn't ideal, and the usage doesn't yet involve the wrapped 
function at all:
   
   ```python
   @task.runner(command=['my_exec', 'arg1', 'arg2'])
   def do_something(): pass
   ```
   
   I'm going to play around with it some more and see if I can get the XCom 
args working so that incoming args can be used in `command` within the function 
instead of in the decorator, and output of runner can be returned from the 
function into XCom.
   
   But if not, it's going to be better to just use a factory function for my 
custom operator:
   
   ```python
   def runner(*cmd, **other_conveniences):
       args, kwargs = process(*cmd, **other_conveniences)  # e.g. auto generate 
task_id
       return MyRunnerOperator(*args, **kwargs)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to