ksumanth opened a new issue #12068:
URL: https://github.com/apache/airflow/issues/12068
**Apache Airflow version**: 1.10.4
**Environment**:
- **Cloud provider or hardware configuration**:
- **OS** (e.g. from /etc/os-release): Debian 9
- **Kernel** (e.g. `uname -a`): Linux 9f23d34e2f14 4.19.76-linuxkit #1 SMP
Tue May 26 11:42:35 UTC 2020 x86_64 GNU/Linux
**What happened**: Installed backport providers amazon package version
2020.6.24 and also 2020.10.29 (latest). The AWSBatchOperator does not support
Batch parameter option to replace placeholder variables in the Batch command.
In Airflow Github 1.10 source branch, the code shows that it supports using
Batch parameters with AWS batch. I looked at the installed module source of the
provider package and confirmed that the installed code is out of sync with
Airflow Github 1.10 branch source.
**What you expected to happen**: I expected the AWSBatchOperator to use
Batch parameters value and send it to AWS so that the parameter values can
replace place holder variables in the Batch command.
**How to reproduce it**: Create a AWS Batch Job queue, definition, etc. In
the Job definition, leave the Command and Parameters empty. In the DAG, include
AWSBatchOperator and specify the overrides to include the command with place
holder variables and parameters to replace them with values. For eg:
aws_job_submission = AWSBatchOperator(
task_id='aws-batch-job-submission',
dag=dag,
aws_conn_id='batch_dev',
job_name='airflow-job-submission-and-run-' +
datetime.today().strftime('%Y-%m-%d'),
job_definition='testwithparams',
job_queue='dev-test-queue',
overrides={'command':
['-b','Ref::bucket','-v','Ref::testfile','-r','Ref::serfile']},
parameters={
"bucket": "mybucket",
"testfile": "testfile.txt",
"serfile": "testref.ser"
}
)
This problem occurs every time.
<details>
Here is the source code of the AWSBatchOperator constructor and execute
method from the installed module. It is evident that parameters option is not
supported.
@apply_defaults
def __init__(self, job_name, job_definition, job_queue, overrides,
max_retries=4200,
aws_conn_id=None, region_name=None, **kwargs):
super(AWSBatchOperator, self).__init__(**kwargs)
self.job_name = job_name
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.job_definition = job_definition
self.job_queue = job_queue
self.overrides = overrides
self.max_retries = max_retries
self.jobId = None
self.jobName = None
self.hook = self.get_hook()
def execute(self, context):
self.log.info(
'Running AWS Batch Job - Job definition: %s - on queue %s',
self.job_definition, self.job_queue
)
self.log.info('AWSBatchOperator overrides: %s', self.overrides)
self.client = self.hook.get_client_type(
'batch',
region_name=self.region_name
)
try:
response = self.client.submit_job(
jobName=self.job_name,
jobQueue=self.job_queue,
jobDefinition=self.job_definition,
containerOverrides=self.overrides)
self.log.info('AWS Batch Job started: %s', response)
self.jobId = response['jobId']
self.jobName = response['jobName']
self._wait_for_task_ended()
self._check_success_task()
self.log.info('AWS Batch Job has been successfully executed:
%s', response)
except Exception as e:
self.log.info('AWS Batch Job has failed executed')
raise AirflowException(e)
</details>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]