feluelle commented on a change in pull request #9594:
URL: https://github.com/apache/airflow/pull/9594#discussion_r461466410



##########
File path: airflow/providers/amazon/aws/hooks/sagemaker.py
##########
@@ -786,6 +825,30 @@ def list_training_jobs(
         )
         return results
 
+    def list_processing_jobs(self, **kwargs) -> List[Dict]:  # noqa: D402
+        """
+        This method wraps boto3's list_processing_jobs(). All arguments should 
be provided via kwargs.
+        Note boto3 expects these in CamelCase format, for example:
+
+        .. code-block:: python
+
+            list_processing_jobs(NameContains="myjob", StatusEquals="Failed")
+
+        .. seealso::
+            
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_processing_jobs
+
+        :param kwargs: (optional) kwargs to boto3's list_training_jobs method
+        :return: results of the list_processing_jobs request
+        """
+
+        config = {}
+        config.update(kwargs)
+        list_processing_jobs_request = 
partial(self.get_conn().list_processing_jobs, **config)

Review comment:
       But why can't we just pass `**kwargs`?
   
   Isn't `**kwargs` equal to `**config` ?

##########
File path: airflow/providers/amazon/aws/operators/sagemaker_processing.py
##########
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.operators.sagemaker_base import 
SageMakerBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SageMakerProcessingOperator(SageMakerBaseOperator):
+    """
+    Initiate a SageMaker processing job.
+
+    This operator returns The ARN of the processing job created in Amazon 
SageMaker.
+
+    :param config: The configuration necessary to start a processing job 
(templated).
+
+        For details of the configuration parameter see 
:py:meth:`SageMaker.Client.create_processing_job`
+    :type config: dict
+    :param aws_conn_id: The AWS connection ID to use.
+    :type aws_conn_id: str
+    :param wait_for_completion: If wait is set to True, the time interval, in 
seconds,
+        that the operation waits to check the status of the processing job.
+    :type wait_for_completion: bool
+    :param print_log: if the operator should print the cloudwatch log during 
processing
+    :type print_log: bool
+    :param check_interval: if wait is set to be true, this is the time interval
+        in seconds which the operator will check the status of the processing 
job
+    :type check_interval: int
+    :param max_ingestion_time: If wait is set to True, the operation fails if 
the processing job
+        doesn't finish within max_ingestion_time seconds. If you set this 
parameter to None,
+        the operation does not timeout.
+    :type max_ingestion_time: int
+    :param action_if_job_exists: Behaviour if the job name already exists. 
Possible options are "increment"
+        (default) and "fail".
+    :type action_if_job_exists: str
+    """
+
+    @apply_defaults
+    def __init__(self,
+                 config,
+                 aws_conn_id,
+                 wait_for_completion=True,
+                 print_log=True,
+                 check_interval=30,
+                 max_ingestion_time=None,
+                 action_if_job_exists: str = "increment",  # TODO use 
typing.Literal for this in Python 3.8
+                 **kwargs):
+        super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
+
+        if action_if_job_exists not in ("increment", "fail"):
+            raise AirflowException(
+                "Argument action_if_job_exists accepts only 'increment' and 
'fail'. "
+                f"Provided value: '{action_if_job_exists}'."
+            )
+        self.action_if_job_exists = action_if_job_exists
+        self.wait_for_completion = wait_for_completion
+        self.print_log = print_log
+        self.check_interval = check_interval
+        self.max_ingestion_time = max_ingestion_time
+        self.create_integer_fields()
+
+    def create_integer_fields(self):
+        """Set fields which should be casted to integers."""
+        if 'StoppingCondition' not in self.config:
+            self.integer_fields = [
+                ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
+                ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB']
+            ]
+        else:
+            self.integer_fields = [
+                ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
+                ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB'],
+                ['StoppingCondition', 'MaxRuntimeInSeconds']
+            ]

Review comment:
       ```suggestion
          self.integer_fields = [
               ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
               ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB']
           ]
           if 'StoppingCondition' in self.config:
               self.integer_fields += [
                   ['StoppingCondition', 'MaxRuntimeInSeconds']
               ]
   ```

##########
File path: tests/providers/amazon/aws/operators/test_sagemaker_processing.py
##########
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import mock
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.sagemaker import SageMakerHook
+from airflow.providers.amazon.aws.operators.sagemaker_processing import 
SageMakerProcessingOperator
+
+job_name = 'test-job-name'
+
+create_processing_params = {
+    "AppSpecification": {
+        "ContainerArguments": ["container_arg"],
+        "ContainerEntrypoint": ["container_entrypoint"],
+        "ImageUri": "{{ image_uri }}",
+    },
+    "Environment": {"{{ key }}": "{{ value }}"},
+    "ExperimentConfig": {
+        "ExperimentName": "ExperimentName",
+        "TrialComponentDisplayName": "TrialComponentDisplayName",
+        "TrialName": "TrialName",
+    },
+    "ProcessingInputs": [
+        {
+            "InputName": "AnalyticsInputName",
+            "S3Input": {
+                "LocalPath": "{{ Local Path }}",
+                "S3CompressionType": "None",
+                "S3DataDistributionType": "FullyReplicated",
+                "S3DataType": "S3Prefix",
+                "S3InputMode": "File",
+                "S3Uri": "{{ S3Uri }}",
+            },
+        }
+    ],
+    "ProcessingJobName": job_name,
+    "ProcessingOutputConfig": {
+        "KmsKeyId": "KmsKeyID",
+        "Outputs": [
+            {
+                "OutputName": "AnalyticsOutputName",
+                "S3Output": {
+                    "LocalPath": "{{ Local Path }}",
+                    "S3UploadMode": "EndOfJob",
+                    "S3Uri": "{{ S3Uri }}",
+                },
+            }
+        ],
+    },
+    "ProcessingResources": {
+        "ClusterConfig": {
+            "InstanceCount": 2,
+            "InstanceType": "ml.p2.xlarge",
+            "VolumeSizeInGB": 30,
+            "VolumeKmsKeyId": "{{ kms_key }}",
+        }
+    },
+    "RoleArn": "arn:aws:iam::0122345678910:role/SageMakerPowerUser",
+    "Tags": [{"{{ key }}": "{{ value }}"}],
+}
+
+
+class TestSageMakerProcessingOperator(unittest.TestCase):
+
+    def setUp(self):
+        self.sagemaker = SageMakerProcessingOperator(
+            task_id='test_sagemaker_operator',
+            aws_conn_id='sagemaker_test_id',
+            config=create_processing_params,
+            wait_for_completion=False,
+            check_interval=5
+        )
+
+    def test_parse_config_integers(self):
+        self.sagemaker.parse_config_integers()
+        
self.assertEqual(self.sagemaker.config['ProcessingResources']['ClusterConfig']['InstanceCount'],
+                         
int(self.sagemaker.config['ProcessingResources']['ClusterConfig']['InstanceCount']))
+        
self.assertEqual(self.sagemaker.config['ProcessingResources']['ClusterConfig']['VolumeSizeInGB'],
+                         
int(self.sagemaker.config['ProcessingResources']['ClusterConfig']['VolumeSizeInGB']))
+
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    @mock.patch.object(SageMakerHook, 'create_processing_job')
+    def test_execute(self, mock_processing, mock_client):
+        mock_processing.return_value = {'ProcessingJobArn': 'testarn',
+                                        'ResponseMetadata': {'HTTPStatusCode': 
200}}
+        self.sagemaker.execute(None)
+        mock_processing.assert_called_once_with(create_processing_params,
+                                                wait_for_completion=False,
+                                                check_interval=5,
+                                                max_ingestion_time=None
+                                                )
+
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    @mock.patch.object(SageMakerHook, 'create_processing_job')
+    def test_execute_with_failure(self, mock_processing, mock_client):
+        mock_processing.return_value = {'ProcessingJobArn': 'testarn',
+                                        'ResponseMetadata': {'HTTPStatusCode': 
404}}
+        self.assertRaises(AirflowException, self.sagemaker.execute, None)
+# pylint: enable=unused-argument

Review comment:
       ```suggestion
   ```

##########
File path: tests/providers/amazon/aws/operators/test_sagemaker_processing.py
##########
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import mock
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.sagemaker import SageMakerHook
+from airflow.providers.amazon.aws.operators.sagemaker_processing import 
SageMakerProcessingOperator
+
+job_name = 'test-job-name'
+
+create_processing_params = {
+    "AppSpecification": {
+        "ContainerArguments": ["container_arg"],
+        "ContainerEntrypoint": ["container_entrypoint"],
+        "ImageUri": "{{ image_uri }}",
+    },
+    "Environment": {"{{ key }}": "{{ value }}"},
+    "ExperimentConfig": {
+        "ExperimentName": "ExperimentName",
+        "TrialComponentDisplayName": "TrialComponentDisplayName",
+        "TrialName": "TrialName",
+    },
+    "ProcessingInputs": [
+        {
+            "InputName": "AnalyticsInputName",
+            "S3Input": {
+                "LocalPath": "{{ Local Path }}",
+                "S3CompressionType": "None",
+                "S3DataDistributionType": "FullyReplicated",
+                "S3DataType": "S3Prefix",
+                "S3InputMode": "File",
+                "S3Uri": "{{ S3Uri }}",
+            },
+        }
+    ],
+    "ProcessingJobName": job_name,
+    "ProcessingOutputConfig": {
+        "KmsKeyId": "KmsKeyID",
+        "Outputs": [
+            {
+                "OutputName": "AnalyticsOutputName",
+                "S3Output": {
+                    "LocalPath": "{{ Local Path }}",
+                    "S3UploadMode": "EndOfJob",
+                    "S3Uri": "{{ S3Uri }}",
+                },
+            }
+        ],
+    },
+    "ProcessingResources": {
+        "ClusterConfig": {
+            "InstanceCount": 2,
+            "InstanceType": "ml.p2.xlarge",
+            "VolumeSizeInGB": 30,
+            "VolumeKmsKeyId": "{{ kms_key }}",
+        }
+    },
+    "RoleArn": "arn:aws:iam::0122345678910:role/SageMakerPowerUser",
+    "Tags": [{"{{ key }}": "{{ value }}"}],
+}
+
+
+class TestSageMakerProcessingOperator(unittest.TestCase):
+
+    def setUp(self):
+        self.sagemaker = SageMakerProcessingOperator(
+            task_id='test_sagemaker_operator',
+            aws_conn_id='sagemaker_test_id',
+            config=create_processing_params,
+            wait_for_completion=False,
+            check_interval=5
+        )
+
+    def test_parse_config_integers(self):
+        self.sagemaker.parse_config_integers()
+        
self.assertEqual(self.sagemaker.config['ProcessingResources']['ClusterConfig']['InstanceCount'],
+                         
int(self.sagemaker.config['ProcessingResources']['ClusterConfig']['InstanceCount']))
+        
self.assertEqual(self.sagemaker.config['ProcessingResources']['ClusterConfig']['VolumeSizeInGB'],
+                         
int(self.sagemaker.config['ProcessingResources']['ClusterConfig']['VolumeSizeInGB']))
+
+    @mock.patch.object(SageMakerHook, 'get_conn')
+    @mock.patch.object(SageMakerHook, 'create_processing_job')
+    def test_execute(self, mock_processing, mock_client):
+        mock_processing.return_value = {'ProcessingJobArn': 'testarn',
+                                        'ResponseMetadata': {'HTTPStatusCode': 
200}}

Review comment:
       You can directy set the `return_value` during _patching_.
   ```suggestion
       @mock.patch.object(SageMakerHook, 'create_processing_job', 
           return_value={ 'ProcessingJobArn': 'testarn', 'ResponseMetadata': { 
'HTTPStatusCode': 200 }})
       def test_execute(self, mock_processing, mock_client):
   ```
   
   More of these cases below.

##########
File path: tests/providers/amazon/aws/operators/test_sagemaker_processing.py
##########
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import mock
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.sagemaker import SageMakerHook
+from airflow.providers.amazon.aws.operators.sagemaker_processing import 
SageMakerProcessingOperator
+
+job_name = 'test-job-name'
+
+create_processing_params = {
+    "AppSpecification": {
+        "ContainerArguments": ["container_arg"],
+        "ContainerEntrypoint": ["container_entrypoint"],
+        "ImageUri": "{{ image_uri }}",
+    },
+    "Environment": {"{{ key }}": "{{ value }}"},
+    "ExperimentConfig": {
+        "ExperimentName": "ExperimentName",
+        "TrialComponentDisplayName": "TrialComponentDisplayName",
+        "TrialName": "TrialName",
+    },
+    "ProcessingInputs": [
+        {
+            "InputName": "AnalyticsInputName",
+            "S3Input": {
+                "LocalPath": "{{ Local Path }}",
+                "S3CompressionType": "None",
+                "S3DataDistributionType": "FullyReplicated",
+                "S3DataType": "S3Prefix",
+                "S3InputMode": "File",
+                "S3Uri": "{{ S3Uri }}",
+            },
+        }
+    ],
+    "ProcessingJobName": job_name,
+    "ProcessingOutputConfig": {
+        "KmsKeyId": "KmsKeyID",
+        "Outputs": [
+            {
+                "OutputName": "AnalyticsOutputName",
+                "S3Output": {
+                    "LocalPath": "{{ Local Path }}",
+                    "S3UploadMode": "EndOfJob",
+                    "S3Uri": "{{ S3Uri }}",
+                },
+            }
+        ],
+    },
+    "ProcessingResources": {
+        "ClusterConfig": {
+            "InstanceCount": 2,
+            "InstanceType": "ml.p2.xlarge",
+            "VolumeSizeInGB": 30,
+            "VolumeKmsKeyId": "{{ kms_key }}",
+        }
+    },
+    "RoleArn": "arn:aws:iam::0122345678910:role/SageMakerPowerUser",
+    "Tags": [{"{{ key }}": "{{ value }}"}],
+}
+
+
+class TestSageMakerProcessingOperator(unittest.TestCase):
+
+    def setUp(self):
+        self.sagemaker = SageMakerProcessingOperator(
+            task_id='test_sagemaker_operator',
+            aws_conn_id='sagemaker_test_id',
+            config=create_processing_params,
+            wait_for_completion=False,
+            check_interval=5
+        )
+
+    def test_parse_config_integers(self):
+        self.sagemaker.parse_config_integers()
+        
self.assertEqual(self.sagemaker.config['ProcessingResources']['ClusterConfig']['InstanceCount'],
+                         
int(self.sagemaker.config['ProcessingResources']['ClusterConfig']['InstanceCount']))
+        
self.assertEqual(self.sagemaker.config['ProcessingResources']['ClusterConfig']['VolumeSizeInGB'],
+                         
int(self.sagemaker.config['ProcessingResources']['ClusterConfig']['VolumeSizeInGB']))

Review comment:
       The int-parsing should be tested by `SageMakerBaseOperator` dont you 
think?

##########
File path: tests/providers/amazon/aws/operators/test_sagemaker_processing.py
##########
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import mock
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.sagemaker import SageMakerHook
+from airflow.providers.amazon.aws.operators.sagemaker_processing import 
SageMakerProcessingOperator
+
+job_name = 'test-job-name'
+
+create_processing_params = {
+    "AppSpecification": {
+        "ContainerArguments": ["container_arg"],
+        "ContainerEntrypoint": ["container_entrypoint"],
+        "ImageUri": "{{ image_uri }}",
+    },
+    "Environment": {"{{ key }}": "{{ value }}"},
+    "ExperimentConfig": {
+        "ExperimentName": "ExperimentName",
+        "TrialComponentDisplayName": "TrialComponentDisplayName",
+        "TrialName": "TrialName",
+    },
+    "ProcessingInputs": [
+        {
+            "InputName": "AnalyticsInputName",
+            "S3Input": {
+                "LocalPath": "{{ Local Path }}",
+                "S3CompressionType": "None",
+                "S3DataDistributionType": "FullyReplicated",
+                "S3DataType": "S3Prefix",
+                "S3InputMode": "File",
+                "S3Uri": "{{ S3Uri }}",
+            },
+        }
+    ],
+    "ProcessingJobName": job_name,
+    "ProcessingOutputConfig": {
+        "KmsKeyId": "KmsKeyID",
+        "Outputs": [
+            {
+                "OutputName": "AnalyticsOutputName",
+                "S3Output": {
+                    "LocalPath": "{{ Local Path }}",
+                    "S3UploadMode": "EndOfJob",
+                    "S3Uri": "{{ S3Uri }}",
+                },
+            }
+        ],
+    },
+    "ProcessingResources": {
+        "ClusterConfig": {
+            "InstanceCount": 2,
+            "InstanceType": "ml.p2.xlarge",
+            "VolumeSizeInGB": 30,
+            "VolumeKmsKeyId": "{{ kms_key }}",
+        }
+    },
+    "RoleArn": "arn:aws:iam::0122345678910:role/SageMakerPowerUser",
+    "Tags": [{"{{ key }}": "{{ value }}"}],
+}
+
+
+class TestSageMakerProcessingOperator(unittest.TestCase):
+
+    def setUp(self):
+        self.sagemaker = SageMakerProcessingOperator(
+            task_id='test_sagemaker_operator',
+            aws_conn_id='sagemaker_test_id',
+            config=create_processing_params,
+            wait_for_completion=False,
+            check_interval=5
+        )
+
+    def test_parse_config_integers(self):
+        self.sagemaker.parse_config_integers()
+        
self.assertEqual(self.sagemaker.config['ProcessingResources']['ClusterConfig']['InstanceCount'],
+                         
int(self.sagemaker.config['ProcessingResources']['ClusterConfig']['InstanceCount']))
+        
self.assertEqual(self.sagemaker.config['ProcessingResources']['ClusterConfig']['VolumeSizeInGB'],
+                         
int(self.sagemaker.config['ProcessingResources']['ClusterConfig']['VolumeSizeInGB']))

Review comment:
       What you added was the proper `self.integer_fields`. That's what you 
want to test.

##########
File path: airflow/providers/amazon/aws/operators/sagemaker_processing.py
##########
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.operators.sagemaker_base import 
SageMakerBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SageMakerProcessingOperator(SageMakerBaseOperator):
+    """
+    Initiate a SageMaker processing job.
+
+    This operator returns The ARN of the processing job created in Amazon 
SageMaker.
+
+    :param config: The configuration necessary to start a processing job 
(templated).
+
+        For details of the configuration parameter see 
:py:meth:`SageMaker.Client.create_processing_job`
+    :type config: dict
+    :param aws_conn_id: The AWS connection ID to use.
+    :type aws_conn_id: str
+    :param wait_for_completion: If wait is set to True, the time interval, in 
seconds,
+        that the operation waits to check the status of the processing job.
+    :type wait_for_completion: bool
+    :param print_log: if the operator should print the cloudwatch log during 
processing
+    :type print_log: bool
+    :param check_interval: if wait is set to be true, this is the time interval
+        in seconds which the operator will check the status of the processing 
job
+    :type check_interval: int
+    :param max_ingestion_time: If wait is set to True, the operation fails if 
the processing job
+        doesn't finish within max_ingestion_time seconds. If you set this 
parameter to None,
+        the operation does not timeout.
+    :type max_ingestion_time: int
+    :param action_if_job_exists: Behaviour if the job name already exists. 
Possible options are "increment"
+        (default) and "fail".
+    :type action_if_job_exists: str
+    """
+
+    @apply_defaults
+    def __init__(self,
+                 config,
+                 aws_conn_id,
+                 wait_for_completion=True,
+                 print_log=True,
+                 check_interval=30,
+                 max_ingestion_time=None,
+                 action_if_job_exists: str = "increment",  # TODO use 
typing.Literal for this in Python 3.8
+                 **kwargs):
+        super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
+
+        if action_if_job_exists not in ("increment", "fail"):
+            raise AirflowException(
+                "Argument action_if_job_exists accepts only 'increment' and 
'fail'. "
+                f"Provided value: '{action_if_job_exists}'."
+            )
+        self.action_if_job_exists = action_if_job_exists
+        self.wait_for_completion = wait_for_completion
+        self.print_log = print_log
+        self.check_interval = check_interval
+        self.max_ingestion_time = max_ingestion_time
+        self.create_integer_fields()
+
+    def create_integer_fields(self):

Review comment:
       ```suggestion
       def _create_integer_fields(self):
   ```

##########
File path: airflow/providers/amazon/aws/operators/sagemaker_processing.py
##########
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.operators.sagemaker_base import 
SageMakerBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SageMakerProcessingOperator(SageMakerBaseOperator):
+    """
+    Initiate a SageMaker processing job.
+
+    This operator returns The ARN of the processing job created in Amazon 
SageMaker.
+
+    :param config: The configuration necessary to start a processing job 
(templated).
+
+        For details of the configuration parameter see 
:py:meth:`SageMaker.Client.create_processing_job`
+    :type config: dict
+    :param aws_conn_id: The AWS connection ID to use.
+    :type aws_conn_id: str
+    :param wait_for_completion: If wait is set to True, the time interval, in 
seconds,
+        that the operation waits to check the status of the processing job.
+    :type wait_for_completion: bool
+    :param print_log: if the operator should print the cloudwatch log during 
processing
+    :type print_log: bool
+    :param check_interval: if wait is set to be true, this is the time interval
+        in seconds which the operator will check the status of the processing 
job
+    :type check_interval: int
+    :param max_ingestion_time: If wait is set to True, the operation fails if 
the processing job
+        doesn't finish within max_ingestion_time seconds. If you set this 
parameter to None,
+        the operation does not timeout.
+    :type max_ingestion_time: int
+    :param action_if_job_exists: Behaviour if the job name already exists. 
Possible options are "increment"
+        (default) and "fail".
+    :type action_if_job_exists: str
+    """
+
+    @apply_defaults
+    def __init__(self,
+                 config,
+                 aws_conn_id,
+                 wait_for_completion=True,
+                 print_log=True,
+                 check_interval=30,
+                 max_ingestion_time=None,
+                 action_if_job_exists: str = "increment",  # TODO use 
typing.Literal for this in Python 3.8
+                 **kwargs):
+        super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
+
+        if action_if_job_exists not in ("increment", "fail"):
+            raise AirflowException(
+                "Argument action_if_job_exists accepts only 'increment' and 
'fail'. "
+                f"Provided value: '{action_if_job_exists}'."
+            )
+        self.action_if_job_exists = action_if_job_exists
+        self.wait_for_completion = wait_for_completion
+        self.print_log = print_log
+        self.check_interval = check_interval
+        self.max_ingestion_time = max_ingestion_time
+        self.create_integer_fields()
+
+    def create_integer_fields(self):
+        """Set fields which should be casted to integers."""
+        if 'StoppingCondition' not in self.config:
+            self.integer_fields = [
+                ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
+                ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB']
+            ]
+        else:
+            self.integer_fields = [
+                ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
+                ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB'],
+                ['StoppingCondition', 'MaxRuntimeInSeconds']
+            ]
+
+    def expand_role(self):
+        if 'RoleArn' in self.config:
+            hook = AwsBaseHook(self.aws_conn_id, client_type='iam')
+            self.config['RoleArn'] = hook.expand_role(self.config['RoleArn'])
+
+    def execute(self, context):
+        self.preprocess_config()
+
+        processing_job_name = self.config["ProcessingJobName"]
+        processing_jobs = 
self.hook.list_processing_jobs(NameContains=processing_job_name)
+
+        # Check if given ProcessingJobName already exists
+        if processing_job_name in [pj["ProcessingJobName"] for pj in 
processing_jobs]:
+            if self.action_if_job_exists == "increment":
+                self.log.info("Found existing processing job with name '%s'.", 
processing_job_name)
+                new_processing_job_name = 
f"{processing_job_name}-{len(processing_jobs) + 1}"
+                self.config["ProcessingJobName"] = new_processing_job_name
+                self.log.info("Incremented processing job name to '%s'.", 
new_processing_job_name)
+            elif self.action_if_job_exists == "fail":
+                raise AirflowException(
+                    f"A SageMaker processing job with name 
{processing_job_name} already exists."
+                )

Review comment:
       ```suggestion
               if self.action_if_job_exists == "fail":
                   raise AirflowException(
                       f"A SageMaker processing job with name 
{processing_job_name} already exists."
                   )
               if self.action_if_job_exists == "increment":
                   self.log.info("Found existing processing job with name 
'%s'.", processing_job_name)
                   new_processing_job_name = 
f"{processing_job_name}-{len(processing_jobs) + 1}"
                   self.config["ProcessingJobName"] = new_processing_job_name
                   self.log.info("Incremented processing job name to '%s'.", 
new_processing_job_name)
   ```

##########
File path: airflow/providers/amazon/aws/operators/sagemaker_processing.py
##########
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.operators.sagemaker_base import 
SageMakerBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SageMakerProcessingOperator(SageMakerBaseOperator):
+    """
+    Initiate a SageMaker processing job.
+
+    This operator returns The ARN of the processing job created in Amazon 
SageMaker.
+
+    :param config: The configuration necessary to start a processing job 
(templated).
+
+        For details of the configuration parameter see 
:py:meth:`SageMaker.Client.create_processing_job`
+    :type config: dict
+    :param aws_conn_id: The AWS connection ID to use.
+    :type aws_conn_id: str
+    :param wait_for_completion: If wait is set to True, the time interval, in 
seconds,
+        that the operation waits to check the status of the processing job.
+    :type wait_for_completion: bool
+    :param print_log: if the operator should print the cloudwatch log during 
processing
+    :type print_log: bool
+    :param check_interval: if wait is set to be true, this is the time interval
+        in seconds which the operator will check the status of the processing 
job
+    :type check_interval: int
+    :param max_ingestion_time: If wait is set to True, the operation fails if 
the processing job
+        doesn't finish within max_ingestion_time seconds. If you set this 
parameter to None,
+        the operation does not timeout.
+    :type max_ingestion_time: int
+    :param action_if_job_exists: Behaviour if the job name already exists. 
Possible options are "increment"
+        (default) and "fail".
+    :type action_if_job_exists: str
+    """
+
+    @apply_defaults
+    def __init__(self,
+                 config,
+                 aws_conn_id,
+                 wait_for_completion=True,
+                 print_log=True,
+                 check_interval=30,
+                 max_ingestion_time=None,
+                 action_if_job_exists: str = "increment",  # TODO use 
typing.Literal for this in Python 3.8
+                 **kwargs):
+        super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
+
+        if action_if_job_exists not in ("increment", "fail"):
+            raise AirflowException(
+                "Argument action_if_job_exists accepts only 'increment' and 
'fail'. "
+                f"Provided value: '{action_if_job_exists}'."
+            )
+        self.action_if_job_exists = action_if_job_exists
+        self.wait_for_completion = wait_for_completion
+        self.print_log = print_log
+        self.check_interval = check_interval
+        self.max_ingestion_time = max_ingestion_time
+        self.create_integer_fields()
+
+    def create_integer_fields(self):
+        """Set fields which should be casted to integers."""
+        if 'StoppingCondition' not in self.config:
+            self.integer_fields = [
+                ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
+                ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB']
+            ]
+        else:
+            self.integer_fields = [
+                ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
+                ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB'],
+                ['StoppingCondition', 'MaxRuntimeInSeconds']
+            ]
+
+    def expand_role(self):
+        if 'RoleArn' in self.config:
+            hook = AwsBaseHook(self.aws_conn_id, client_type='iam')
+            self.config['RoleArn'] = hook.expand_role(self.config['RoleArn'])
+
+    def execute(self, context):
+        self.preprocess_config()
+
+        processing_job_name = self.config["ProcessingJobName"]
+        processing_jobs = 
self.hook.list_processing_jobs(NameContains=processing_job_name)
+
+        # Check if given ProcessingJobName already exists
+        if processing_job_name in [pj["ProcessingJobName"] for pj in 
processing_jobs]:
+            if self.action_if_job_exists == "increment":
+                self.log.info("Found existing processing job with name '%s'.", 
processing_job_name)
+                new_processing_job_name = 
f"{processing_job_name}-{len(processing_jobs) + 1}"
+                self.config["ProcessingJobName"] = new_processing_job_name
+                self.log.info("Incremented processing job name to '%s'.", 
new_processing_job_name)
+            elif self.action_if_job_exists == "fail":
+                raise AirflowException(
+                    f"A SageMaker processing job with name 
{processing_job_name} already exists."
+                )
+
+        self.log.info("Creating SageMaker processing job %s.", 
self.config["ProcessingJobName"])
+        response = self.hook.create_processing_job(
+            self.config,
+            wait_for_completion=self.wait_for_completion,
+            check_interval=self.check_interval,
+            max_ingestion_time=self.max_ingestion_time
+        )
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException('Sagemaker Processing Job creation failed: 
%s' % response)
+        else:
+            return {
+                'Processing': self.hook.describe_processing_job(
+                    self.config['ProcessingJobName']
+                )
+            }

Review comment:
       ```suggestion
           return {
               'Processing': self.hook.describe_processing_job(
                   self.config['ProcessingJobName']
               )
           }
   ```

##########
File path: airflow/providers/amazon/aws/operators/sagemaker_processing.py
##########
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.operators.sagemaker_base import 
SageMakerBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SageMakerProcessingOperator(SageMakerBaseOperator):
+    """
+    Initiate a SageMaker processing job.
+
+    This operator returns The ARN of the processing job created in Amazon 
SageMaker.
+
+    :param config: The configuration necessary to start a processing job 
(templated).
+
+        For details of the configuration parameter see 
:py:meth:`SageMaker.Client.create_processing_job`
+    :type config: dict
+    :param aws_conn_id: The AWS connection ID to use.
+    :type aws_conn_id: str
+    :param wait_for_completion: If wait is set to True, the time interval, in 
seconds,
+        that the operation waits to check the status of the processing job.
+    :type wait_for_completion: bool
+    :param print_log: if the operator should print the cloudwatch log during 
processing
+    :type print_log: bool
+    :param check_interval: if wait is set to be true, this is the time interval
+        in seconds which the operator will check the status of the processing 
job
+    :type check_interval: int
+    :param max_ingestion_time: If wait is set to True, the operation fails if 
the processing job
+        doesn't finish within max_ingestion_time seconds. If you set this 
parameter to None,
+        the operation does not timeout.
+    :type max_ingestion_time: int
+    :param action_if_job_exists: Behaviour if the job name already exists. 
Possible options are "increment"
+        (default) and "fail".
+    :type action_if_job_exists: str
+    """
+
+    @apply_defaults
+    def __init__(self,
+                 config,
+                 aws_conn_id,
+                 wait_for_completion=True,
+                 print_log=True,
+                 check_interval=30,
+                 max_ingestion_time=None,
+                 action_if_job_exists: str = "increment",  # TODO use 
typing.Literal for this in Python 3.8
+                 **kwargs):
+        super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
+
+        if action_if_job_exists not in ("increment", "fail"):
+            raise AirflowException(
+                "Argument action_if_job_exists accepts only 'increment' and 
'fail'. "
+                f"Provided value: '{action_if_job_exists}'."
+            )
+        self.action_if_job_exists = action_if_job_exists
+        self.wait_for_completion = wait_for_completion
+        self.print_log = print_log
+        self.check_interval = check_interval
+        self.max_ingestion_time = max_ingestion_time
+        self.create_integer_fields()
+
+    def create_integer_fields(self):

Review comment:
       It is a _private_ method, isn't it?
   ```suggestion
       def _create_integer_fields(self):
   ```
   
   You don't want it to be called from outside this context, correct?

##########
File path: airflow/providers/amazon/aws/operators/sagemaker_processing.py
##########
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.operators.sagemaker_base import 
SageMakerBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SageMakerProcessingOperator(SageMakerBaseOperator):
+    """
+    Initiate a SageMaker processing job.
+
+    This operator returns The ARN of the processing job created in Amazon 
SageMaker.
+
+    :param config: The configuration necessary to start a processing job 
(templated).
+
+        For details of the configuration parameter see 
:py:meth:`SageMaker.Client.create_processing_job`
+    :type config: dict
+    :param aws_conn_id: The AWS connection ID to use.
+    :type aws_conn_id: str
+    :param wait_for_completion: If wait is set to True, the time interval, in 
seconds,
+        that the operation waits to check the status of the processing job.
+    :type wait_for_completion: bool
+    :param print_log: if the operator should print the cloudwatch log during 
processing
+    :type print_log: bool
+    :param check_interval: if wait is set to be true, this is the time interval
+        in seconds which the operator will check the status of the processing 
job
+    :type check_interval: int
+    :param max_ingestion_time: If wait is set to True, the operation fails if 
the processing job
+        doesn't finish within max_ingestion_time seconds. If you set this 
parameter to None,
+        the operation does not timeout.
+    :type max_ingestion_time: int
+    :param action_if_job_exists: Behaviour if the job name already exists. 
Possible options are "increment"
+        (default) and "fail".
+    :type action_if_job_exists: str
+    """
+
+    @apply_defaults
+    def __init__(self,
+                 config,
+                 aws_conn_id,
+                 wait_for_completion=True,
+                 print_log=True,
+                 check_interval=30,
+                 max_ingestion_time=None,
+                 action_if_job_exists: str = "increment",  # TODO use 
typing.Literal for this in Python 3.8
+                 **kwargs):
+        super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
+
+        if action_if_job_exists not in ("increment", "fail"):
+            raise AirflowException(
+                "Argument action_if_job_exists accepts only 'increment' and 
'fail'. "
+                f"Provided value: '{action_if_job_exists}'."
+            )
+        self.action_if_job_exists = action_if_job_exists
+        self.wait_for_completion = wait_for_completion
+        self.print_log = print_log
+        self.check_interval = check_interval
+        self.max_ingestion_time = max_ingestion_time
+        self.create_integer_fields()
+
+    def create_integer_fields(self):
+        """Set fields which should be casted to integers."""
+        if 'StoppingCondition' not in self.config:
+            self.integer_fields = [
+                ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
+                ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB']
+            ]
+        else:
+            self.integer_fields = [
+                ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
+                ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB'],
+                ['StoppingCondition', 'MaxRuntimeInSeconds']
+            ]
+
+    def expand_role(self):
+        if 'RoleArn' in self.config:
+            hook = AwsBaseHook(self.aws_conn_id, client_type='iam')
+            self.config['RoleArn'] = hook.expand_role(self.config['RoleArn'])
+
+    def execute(self, context):
+        self.preprocess_config()
+
+        processing_job_name = self.config["ProcessingJobName"]
+        processing_jobs = 
self.hook.list_processing_jobs(NameContains=processing_job_name)
+
+        # Check if given ProcessingJobName already exists
+        if processing_job_name in [pj["ProcessingJobName"] for pj in 
processing_jobs]:
+            if self.action_if_job_exists == "increment":
+                self.log.info("Found existing processing job with name '%s'.", 
processing_job_name)
+                new_processing_job_name = 
f"{processing_job_name}-{len(processing_jobs) + 1}"
+                self.config["ProcessingJobName"] = new_processing_job_name
+                self.log.info("Incremented processing job name to '%s'.", 
new_processing_job_name)
+            elif self.action_if_job_exists == "fail":
+                raise AirflowException(
+                    f"A SageMaker processing job with name 
{processing_job_name} already exists."
+                )

Review comment:
       You can turn around the `if`, `elif` and first check for failures and 
stop there before continuing.
   
   ```suggestion
               if self.action_if_job_exists == "fail":
                   raise AirflowException(
                       f"A SageMaker processing job with name 
{processing_job_name} already exists."
                   )
               if self.action_if_job_exists == "increment":
                   self.log.info("Found existing processing job with name 
'%s'.", processing_job_name)
                   new_processing_job_name = 
f"{processing_job_name}-{len(processing_jobs) + 1}"
                   self.config["ProcessingJobName"] = new_processing_job_name
                   self.log.info("Incremented processing job name to '%s'.", 
new_processing_job_name)
   ```

##########
File path: airflow/providers/amazon/aws/operators/sagemaker_processing.py
##########
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.operators.sagemaker_base import 
SageMakerBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SageMakerProcessingOperator(SageMakerBaseOperator):
+    """
+    Initiate a SageMaker processing job.
+
+    This operator returns The ARN of the processing job created in Amazon 
SageMaker.
+
+    :param config: The configuration necessary to start a processing job 
(templated).
+
+        For details of the configuration parameter see 
:py:meth:`SageMaker.Client.create_processing_job`
+    :type config: dict
+    :param aws_conn_id: The AWS connection ID to use.
+    :type aws_conn_id: str
+    :param wait_for_completion: If wait is set to True, the time interval, in 
seconds,
+        that the operation waits to check the status of the processing job.
+    :type wait_for_completion: bool
+    :param print_log: if the operator should print the cloudwatch log during 
processing
+    :type print_log: bool
+    :param check_interval: if wait is set to be true, this is the time interval
+        in seconds which the operator will check the status of the processing 
job
+    :type check_interval: int
+    :param max_ingestion_time: If wait is set to True, the operation fails if 
the processing job
+        doesn't finish within max_ingestion_time seconds. If you set this 
parameter to None,
+        the operation does not timeout.
+    :type max_ingestion_time: int
+    :param action_if_job_exists: Behaviour if the job name already exists. 
Possible options are "increment"
+        (default) and "fail".
+    :type action_if_job_exists: str
+    """
+
+    @apply_defaults
+    def __init__(self,
+                 config,
+                 aws_conn_id,
+                 wait_for_completion=True,
+                 print_log=True,
+                 check_interval=30,
+                 max_ingestion_time=None,
+                 action_if_job_exists: str = "increment",  # TODO use 
typing.Literal for this in Python 3.8
+                 **kwargs):
+        super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
+
+        if action_if_job_exists not in ("increment", "fail"):
+            raise AirflowException(
+                "Argument action_if_job_exists accepts only 'increment' and 
'fail'. "
+                f"Provided value: '{action_if_job_exists}'."
+            )
+        self.action_if_job_exists = action_if_job_exists
+        self.wait_for_completion = wait_for_completion
+        self.print_log = print_log
+        self.check_interval = check_interval
+        self.max_ingestion_time = max_ingestion_time
+        self.create_integer_fields()
+
+    def create_integer_fields(self):
+        """Set fields which should be casted to integers."""
+        if 'StoppingCondition' not in self.config:
+            self.integer_fields = [
+                ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
+                ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB']
+            ]
+        else:
+            self.integer_fields = [
+                ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
+                ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB'],
+                ['StoppingCondition', 'MaxRuntimeInSeconds']
+            ]
+
+    def expand_role(self):
+        if 'RoleArn' in self.config:
+            hook = AwsBaseHook(self.aws_conn_id, client_type='iam')
+            self.config['RoleArn'] = hook.expand_role(self.config['RoleArn'])
+
+    def execute(self, context):
+        self.preprocess_config()
+
+        processing_job_name = self.config["ProcessingJobName"]
+        processing_jobs = 
self.hook.list_processing_jobs(NameContains=processing_job_name)
+
+        # Check if given ProcessingJobName already exists
+        if processing_job_name in [pj["ProcessingJobName"] for pj in 
processing_jobs]:
+            if self.action_if_job_exists == "increment":
+                self.log.info("Found existing processing job with name '%s'.", 
processing_job_name)
+                new_processing_job_name = 
f"{processing_job_name}-{len(processing_jobs) + 1}"
+                self.config["ProcessingJobName"] = new_processing_job_name
+                self.log.info("Incremented processing job name to '%s'.", 
new_processing_job_name)
+            elif self.action_if_job_exists == "fail":
+                raise AirflowException(
+                    f"A SageMaker processing job with name 
{processing_job_name} already exists."
+                )
+
+        self.log.info("Creating SageMaker processing job %s.", 
self.config["ProcessingJobName"])
+        response = self.hook.create_processing_job(
+            self.config,
+            wait_for_completion=self.wait_for_completion,
+            check_interval=self.check_interval,
+            max_ingestion_time=self.max_ingestion_time
+        )
+        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
+            raise AirflowException('Sagemaker Processing Job creation failed: 
%s' % response)
+        else:
+            return {
+                'Processing': self.hook.describe_processing_job(
+                    self.config['ProcessingJobName']
+                )
+            }

Review comment:
       You will only get to this point if it didn't fail until here.
   
   ```suggestion
           return {
               'Processing': self.hook.describe_processing_job(
                   self.config['ProcessingJobName']
               )
           }
   ```

##########
File path: tests/providers/amazon/aws/operators/test_sagemaker_processing.py
##########
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import mock
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.sagemaker import SageMakerHook
+from airflow.providers.amazon.aws.operators.sagemaker_processing import 
SageMakerProcessingOperator
+
+job_name = 'test-job-name'
+
+create_processing_params = {
+    "AppSpecification": {
+        "ContainerArguments": ["container_arg"],
+        "ContainerEntrypoint": ["container_entrypoint"],
+        "ImageUri": "{{ image_uri }}",
+    },
+    "Environment": {"{{ key }}": "{{ value }}"},
+    "ExperimentConfig": {
+        "ExperimentName": "ExperimentName",
+        "TrialComponentDisplayName": "TrialComponentDisplayName",
+        "TrialName": "TrialName",
+    },
+    "ProcessingInputs": [
+        {
+            "InputName": "AnalyticsInputName",
+            "S3Input": {
+                "LocalPath": "{{ Local Path }}",
+                "S3CompressionType": "None",
+                "S3DataDistributionType": "FullyReplicated",
+                "S3DataType": "S3Prefix",
+                "S3InputMode": "File",
+                "S3Uri": "{{ S3Uri }}",
+            },
+        }
+    ],
+    "ProcessingJobName": job_name,
+    "ProcessingOutputConfig": {
+        "KmsKeyId": "KmsKeyID",
+        "Outputs": [
+            {
+                "OutputName": "AnalyticsOutputName",
+                "S3Output": {
+                    "LocalPath": "{{ Local Path }}",
+                    "S3UploadMode": "EndOfJob",
+                    "S3Uri": "{{ S3Uri }}",
+                },
+            }
+        ],
+    },
+    "ProcessingResources": {
+        "ClusterConfig": {
+            "InstanceCount": 2,
+            "InstanceType": "ml.p2.xlarge",
+            "VolumeSizeInGB": 30,
+            "VolumeKmsKeyId": "{{ kms_key }}",
+        }
+    },
+    "RoleArn": "arn:aws:iam::0122345678910:role/SageMakerPowerUser",
+    "Tags": [{"{{ key }}": "{{ value }}"}],
+}
+
+
+class TestSageMakerProcessingOperator(unittest.TestCase):
+
+    def setUp(self):
+        self.sagemaker = SageMakerProcessingOperator(
+            task_id='test_sagemaker_operator',
+            aws_conn_id='sagemaker_test_id',
+            config=create_processing_params,
+            wait_for_completion=False,
+            check_interval=5
+        )
+
+    def test_parse_config_integers(self):
+        self.sagemaker.parse_config_integers()
+        
self.assertEqual(self.sagemaker.config['ProcessingResources']['ClusterConfig']['InstanceCount'],
+                         
int(self.sagemaker.config['ProcessingResources']['ClusterConfig']['InstanceCount']))
+        
self.assertEqual(self.sagemaker.config['ProcessingResources']['ClusterConfig']['VolumeSizeInGB'],
+                         
int(self.sagemaker.config['ProcessingResources']['ClusterConfig']['VolumeSizeInGB']))

Review comment:
       Instead of calling `self.sagemaker.parse_config_integers()` directly, 
can you test that `self.integer_fields` has the values you expect?
   
   You can use `from parameterized import parameterized` to run a test with 
different arguments.
   
   Like so..
   ```suggestion
       @parameterized.expand([
           ([
               ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
               ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB']
           ],),
           ([
               ['ProcessingResources', 'ClusterConfig', 'InstanceCount'],
               ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB'],
               ['StoppingCondition', 'MaxRuntimeInSeconds']
           ],),
       ])
       def test_integer_fields_are_set(self, expected_fields):
           assert self.sagemaker.integer_fields == expected_fields
   ```
   
   But you need to make sure that in one case you pass `StoppingCondition` to 
config and in the other case don't. You probably would need to change a bit 
there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to