felipearcaro-buildium opened a new issue, #62375:
URL: https://github.com/apache/airflow/issues/62375

   ### Apache Airflow Provider(s)
   
   salesforce
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-salesforce 5.12.2
   
   ### Apache Airflow version
   
   2.11.0
   
   ### Operating System
   
   Windows 11
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   When using Jinja templating to pass data from a previous task to 
`SalesforceBulkOperator`:
   
   ```python
   def get_salesforce_records():
       return [
           {
               'Id': 1,
               'Name': 'example1',
               'Status': 'Active',
           },
           {
               'Id': 2,
               'Name': 'example2',
               'Status': 'Active',
           }
       ]
   
   get_records = PythonOperator(
       task_id='get_records',
       python_callable=get_salesforce_records
   )
   
   upsert_to_salesforce = SalesforceBulkOperator(
       task_id='upsert_to_salesforce',
       object_name='Account',
       operation='insert',
       payload="{{ ti.xcom_pull(task_ids='get_records') }}" 
   )
   ```
   
   The payload parameter receives the literal string `{{ 
ti.xcom_pull(task_ids='upstream_task') }}` instead of the actual XCom value, 
causing the operation to fail.
   
   ### What you think should happen instead
   
   The Jinja template should be evaluated at runtime, similar to how other 
Airflow operators work. The operator should support templating for key 
parameters: `payload`, `object_name`, `operation`, and `external_id_field`.
   
   ### How to reproduce
   
   Create a generic DAG and create the following tasks:
   
   ```python
   def get_salesforce_records():
       return [
           {
               'Id': 1,
               'Name': 'example1',
               'Status': 'Active',
           },
           {
               'Id': 2,
               'Name': 'example2',
               'Status': 'Active',
           }
       ]
   
   get_records = PythonOperator(
       task_id='get_records',
       python_callable=get_salesforce_records
   )
   
   upsert_to_salesforce = SalesforceBulkOperator(
       task_id='upsert_to_salesforce',
       object_name='Account',
       operation='insert',
       payload="{{ ti.xcom_pull(task_ids='get_records') }}" 
   )
   ```
   
   ### Anything else
   
   When I added `template_fields` to my custom operator (extending 
`SalesforceBulkOperator`), I noticed that templated parameters are evaluated as 
strings, not as their original Python types. This required additional parsing 
logic in the `execute()` method:
   
   ```python
   def execute(self, context):
       # Convert the templated string back to a Python object
       if isinstance(self.payload, str):
           try:
               # Try ast.literal_eval first - handles Python string 
representation with single quotes
               # e.g., "[{'key': 'value'}]" from XCom
               self.payload = ast.literal_eval(self.payload)
           except (ValueError, SyntaxError):
               # Fallback to json.loads for valid JSON strings (with double 
quotes)
               self.payload = json.loads(self.payload)
       
       return super().execute(context)
   ```
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to