[ 
https://issues.apache.org/jira/browse/AIRFLOW-3620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16757909#comment-16757909
 ] 

Adam C Baker edited comment on AIRFLOW-3620 at 2/9/19 12:36 AM:
----------------------------------------------------------------

The doc example doesn't do anything with the inlets or outlets. It just shows 
that you can set them to things, but the bash command that it runs is {{echo 
1}}. What's the meaning of the datasets if the command ignores them?

try this

{{echo {{task.outlets[0]}}}}

The result is an error. I'd expect it to be the first file in the outlets 
dataset.
 Another way to see what I'm getting at is to look at the code I'm using to 
work around the issue:
{code:java}
class BashPipeOp(PythonOperator):
    """An operator to execute a bash command as a pipe.

    The commands stdin will automatically be the file named on the first
    outlet of its upstream task(s). The stdout will be the file named by
    `out_fname`, which is also placed in the only outlet of the operator.

    :param bash_command: The command to execute, eg "fgrep -f list.txt"
    :param out_fname: The file that will receive the stdout of the command.
    """
    template_fields: Sequence[str] = ('bash_command',)
    template_ext: Sequence[str] = ('.sh', '.bash',)
    ui_color: str = '#f0ede4' # the color of the BashOperator
    bash_command: str

    def __init__(self, bash_command: str, out_fname: str, **kwargs: Any) -> 
None:
        self.bash_command = bash_command
        def exec_bash(task, **kwargs):
            with open(task.inlets[0].name) as infile,\
                    open(task.outlets[0].name, 'w') as outfile:
                logging.info("Running command: %s", bash_command)
                sp = Popen([self.bash_command], stdin=infile, stdout=outfile, 
shell=True)
                sp.wait()
                logging.info("Command exited with return code %s", 
sp.returncode)
                if sp.returncode:
                    raise AirflowException("Bash command failed")
        super().__init__(
                python_callable=exec_bash,
                provide_context=True,
                inlets={'auto': True},
                outlets={'datasets': [File(out_fname)]},
                **kwargs)

{code}
I implement it with the python operator because that has {{task.inlets}} and 
{{task.outlets}} set correctly by the time the execution function is run, while 
the bash operator has them set to {{[]}} when the command is templated 
regardless of the values specified in the task definition.


was (Author: adambaker):
The doc example doesn't do anything with the inlets our outlets. It just shows 
that you can set them to things, but the bash command that it runs is {{echo 
1}}. What's the meaning of the datasets if the command ignores them?

try this

{{echo {{task.outlets[0]}}}}

The result is an error. I'd expect it to be the first file in the outlets 
dataset.
 Another way to see what I'm getting at is to look at the code I'm using to 
work around the issue:
{code:java}
class BashPipeOp(PythonOperator):
    """An operator to execute a bash command as a pipe.

    The commands stdin will automatically be the file named on the first
    outlet of its upstream task(s). The stdout will be the file named by
    `out_fname`, which is also placed in the only outlet of the operator.

    :param bash_command: The command to execute, eg "fgrep -f list.txt"
    :param out_fname: The file that will receive the stdout of the command.
    """
    template_fields: Sequence[str] = ('bash_command',)
    template_ext: Sequence[str] = ('.sh', '.bash',)
    ui_color: str = '#f0ede4' # the color of the BashOperator
    bash_command: str

    def __init__(self, bash_command: str, out_fname: str, **kwargs: Any) -> 
None:
        self.bash_command = bash_command
        def exec_bash(task, **kwargs):
            with open(task.inlets[0].name) as infile,\
                    open(task.outlets[0].name, 'w') as outfile:
                logging.info("Running command: %s", bash_command)
                sp = Popen([self.bash_command], stdin=infile, stdout=outfile, 
shell=True)
                sp.wait()
                logging.info("Command exited with return code %s", 
sp.returncode)
                if sp.returncode:
                    raise AirflowException("Bash command failed")
        super().__init__(
                python_callable=exec_bash,
                provide_context=True,
                inlets={'auto': True},
                outlets={'datasets': [File(out_fname)]},
                **kwargs)

{code}
I implement it with the python operator because that has {{task.inlets}} and 
{{task.outlets}} set correctly by the time the execution function is run, while 
the bash operator has them set to {{[]}} when the command is templated 
regardless of the values specified in the task definition.

> Inlets and outlets are always empty in bash operator templates
> --------------------------------------------------------------
>
>                 Key: AIRFLOW-3620
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3620
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.10.1
>            Reporter: Adam C Baker
>            Priority: Major
>
> When creating a data pipeline where one task's input is its upstream task's 
> output, it seems the only way to automatically coordinate these is to use the 
> lineage feature and set {{inlets = \{"auto": True}}}. Doing this with a 
> PythonOperator allows one to get the input and output data sources for the 
> task by passing in the context and getting {{task.inlets}} and 
> {{task.outlets}} values.
> This fails with the BashOperator. With a template including something like 
> {{{{task.inlets[0]}}}}, it throws an exception, and templating with 
> {{{{task.inlets}}}} or {{{{task.outlets}}}} always reveals these values to be 
> an empty list.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to