ksumanth opened a new issue #12068:
URL: https://github.com/apache/airflow/issues/12068


   **Apache Airflow version**: 1.10.4
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**:
   - **OS** (e.g. from /etc/os-release): Debian 9
   - **Kernel** (e.g. `uname -a`):  Linux 9f23d34e2f14 4.19.76-linuxkit #1 SMP 
Tue May 26 11:42:35 UTC 2020 x86_64 GNU/Linux
   
   **What happened**: Installed backport providers amazon package version 
2020.6.24 and also 2020.10.29 (latest). The AWSBatchOperator does not support 
Batch parameter option to replace placeholder variables in the Batch command. 
In Airflow Github 1.10 source branch, the code shows that it supports using 
Batch parameters with AWS batch. I looked at the installed module source of the 
provider package and confirmed that the installed code is out of sync with 
Airflow Github 1.10 branch source.
   
   **What you expected to happen**: I expected the AWSBatchOperator to use 
Batch parameters value and send it to AWS so that the parameter values can 
replace place holder variables in the Batch command.
   
   **How to reproduce it**: Create a AWS Batch Job queue, definition, etc. In 
the Job definition, leave the Command and Parameters empty. In the DAG, include 
AWSBatchOperator and specify the overrides to include the command with place 
holder variables and parameters to replace them with values. For eg:
   
   aws_job_submission = AWSBatchOperator(
       task_id='aws-batch-job-submission',
       dag=dag,
       aws_conn_id='batch_dev',
       job_name='airflow-job-submission-and-run-' +  
datetime.today().strftime('%Y-%m-%d'),
       job_definition='testwithparams',
       job_queue='dev-test-queue',
       overrides={'command': 
['-b','Ref::bucket','-v','Ref::testfile','-r','Ref::serfile']},
       parameters={
            "bucket": "mybucket",
            "testfile": "testfile.txt",
            "serfile": "testref.ser"
        }
       )
   
   This problem occurs every time.
   
   <details>
   Here is the source code of the AWSBatchOperator constructor and execute 
method from the installed module. It is evident that parameters option is not 
supported.
   
       @apply_defaults
       def __init__(self, job_name, job_definition, job_queue, overrides, 
max_retries=4200,
                    aws_conn_id=None, region_name=None, **kwargs):
           super(AWSBatchOperator, self).__init__(**kwargs)
   
           self.job_name = job_name
           self.aws_conn_id = aws_conn_id
           self.region_name = region_name
           self.job_definition = job_definition
           self.job_queue = job_queue
           self.overrides = overrides
           self.max_retries = max_retries
   
           self.jobId = None
           self.jobName = None
   
           self.hook = self.get_hook()
   
       def execute(self, context):
           self.log.info(
               'Running AWS Batch Job - Job definition: %s - on queue %s',
               self.job_definition, self.job_queue
           )
           self.log.info('AWSBatchOperator overrides: %s', self.overrides)
   
           self.client = self.hook.get_client_type(
               'batch',
               region_name=self.region_name
           )
   
           try:
               response = self.client.submit_job(
                   jobName=self.job_name,
                   jobQueue=self.job_queue,
                   jobDefinition=self.job_definition,
                   containerOverrides=self.overrides)
   
               self.log.info('AWS Batch Job started: %s', response)
   
               self.jobId = response['jobId']
               self.jobName = response['jobName']
   
               self._wait_for_task_ended()
   
               self._check_success_task()
   
               self.log.info('AWS Batch Job has been successfully executed: 
%s', response)
           except Exception as e:
               self.log.info('AWS Batch Job has failed executed')
               raise AirflowException(e)
   
   </details>
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to