felipearcaro-buildium opened a new issue, #62375:
URL: https://github.com/apache/airflow/issues/62375
### Apache Airflow Provider(s)
salesforce
### Versions of Apache Airflow Providers
apache-airflow-providers-salesforce 5.12.2
### Apache Airflow version
2.11.0
### Operating System
Windows 11
### Deployment
Astronomer
### Deployment details
_No response_
### What happened
When using Jinja templating to pass data from a previous task to
`SalesforceBulkOperator`:
```python
def get_salesforce_records():
return [
{
'Id': 1,
'Name': 'example1',
'Status': 'Active',
},
{
'Id': 2,
'Name': 'example2',
'Status': 'Active',
}
]
get_records = PythonOperator(
task_id='get_records',
python_callable=get_salesforce_records
)
upsert_to_salesforce = SalesforceBulkOperator(
task_id='upsert_to_salesforce',
object_name='Account',
operation='insert',
payload="{{ ti.xcom_pull(task_ids='get_records') }}"
)
```
The payload parameter receives the literal string `{{
ti.xcom_pull(task_ids='upstream_task') }}` instead of the actual XCom value,
causing the operation to fail.
### What you think should happen instead
The Jinja template should be evaluated at runtime, similar to how other
Airflow operators work. The operator should support templating for key
parameters: `payload`, `object_name`, `operation`, and `external_id_field`.
### How to reproduce
Create a generic DAG and create the following tasks:
```python
def get_salesforce_records():
return [
{
'Id': 1,
'Name': 'example1',
'Status': 'Active',
},
{
'Id': 2,
'Name': 'example2',
'Status': 'Active',
}
]
get_records = PythonOperator(
task_id='get_records',
python_callable=get_salesforce_records
)
upsert_to_salesforce = SalesforceBulkOperator(
task_id='upsert_to_salesforce',
object_name='Account',
operation='insert',
payload="{{ ti.xcom_pull(task_ids='get_records') }}"
)
```
### Anything else
When I added `template_fields` to my custom operator (extending
`SalesforceBulkOperator`), I noticed that templated parameters are evaluated as
strings, not as their original Python types. This required additional parsing
logic in the `execute()` method:
```python
def execute(self, context):
# Convert the templated string back to a Python object
if isinstance(self.payload, str):
try:
# Try ast.literal_eval first - handles Python string
representation with single quotes
# e.g., "[{'key': 'value'}]" from XCom
self.payload = ast.literal_eval(self.payload)
except (ValueError, SyntaxError):
# Fallback to json.loads for valid JSON strings (with double
quotes)
self.payload = json.loads(self.payload)
return super().execute(context)
```
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]