Copilot commented on code in PR #64519:
URL: https://github.com/apache/airflow/pull/64519#discussion_r3025325393
##########
providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py:
##########
@@ -84,6 +84,42 @@ def _validate_inputs(self) -> None:
f"Available operations are {self.available_operations}."
)
+ def _log_result_failures(self, result: list) -> None:
+ """
+ Log a warning for every record in the Salesforce Bulk API result that
has ``success=False``.
+
+ Salesforce's Bulk API does not raise when individual records are
rejected — it signals them
+ via ``success=False`` in the result list. Without this, those
failures are completely
+ invisible unless the caller manually inspects the XCom value.
+ """
+ failed = [r for r in result if not r.get("success")]
+ total = len(result)
+
+ if failed:
+ self.log.warning(
+ "Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
+ self.operation,
+ self.object_name,
+ len(failed),
+ total,
+ )
+ for idx, record in enumerate(failed):
+ for error in record.get("errors", []):
+ self.log.warning(
+ "Record failure %d — status: %s | message: %s |
fields: %s",
+ idx,
+ error.get("statusCode"),
+ error.get("message"),
+ error.get("fields"),
+ )
Review Comment:
The logged `idx` is the index within the filtered `failed` list, not the
original record index in the result/payload. This can mislead debugging (e.g.,
the first failure logs as 0 even if it was record 57). Iterate with
`enumerate(result)` and log the original index for each failure (or include
both original index and a failure counter).
##########
providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py:
##########
@@ -84,6 +84,42 @@ def _validate_inputs(self) -> None:
f"Available operations are {self.available_operations}."
)
+ def _log_result_failures(self, result: list) -> None:
+ """
+ Log a warning for every record in the Salesforce Bulk API result that
has ``success=False``.
+
+ Salesforce's Bulk API does not raise when individual records are
rejected — it signals them
+ via ``success=False`` in the result list. Without this, those
failures are completely
+ invisible unless the caller manually inspects the XCom value.
+ """
+ failed = [r for r in result if not r.get("success")]
+ total = len(result)
+
+ if failed:
+ self.log.warning(
+ "Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
+ self.operation,
+ self.object_name,
+ len(failed),
+ total,
+ )
+ for idx, record in enumerate(failed):
+ for error in record.get("errors", []):
+ self.log.warning(
+ "Record failure %d — status: %s | message: %s |
fields: %s",
Review Comment:
The logged `idx` is the index within the filtered `failed` list, not the
original record index in the result/payload. This can mislead debugging (e.g.,
the first failure logs as 0 even if it was record 57). Iterate with
`enumerate(result)` and log the original index for each failure (or include
both original index and a failure counter).
```suggestion
total = len(result)
failed_indices = [idx for idx, r in enumerate(result) if not
r.get("success")]
if failed_indices:
self.log.warning(
"Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
self.operation,
self.object_name,
len(failed_indices),
total,
)
for idx in failed_indices:
record = result[idx]
for error in record.get("errors", []):
self.log.warning(
"Record failure at result index %d — status: %s |
message: %s | fields: %s",
```
##########
providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py:
##########
@@ -84,6 +84,42 @@ def _validate_inputs(self) -> None:
f"Available operations are {self.available_operations}."
)
+ def _log_result_failures(self, result: list) -> None:
+ """
+ Log a warning for every record in the Salesforce Bulk API result that
has ``success=False``.
+
+ Salesforce's Bulk API does not raise when individual records are
rejected — it signals them
+ via ``success=False`` in the result list. Without this, those
failures are completely
+ invisible unless the caller manually inspects the XCom value.
+ """
+ failed = [r for r in result if not r.get("success")]
+ total = len(result)
+
+ if failed:
+ self.log.warning(
+ "Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
+ self.operation,
+ self.object_name,
+ len(failed),
+ total,
+ )
+ for idx, record in enumerate(failed):
+ for error in record.get("errors", []):
+ self.log.warning(
+ "Record failure %d — status: %s | message: %s |
fields: %s",
+ idx,
+ error.get("statusCode"),
+ error.get("message"),
+ error.get("fields"),
+ )
+ else:
+ self.log.info(
+ "Salesforce Bulk API %s on %s completed: %d record(s)
processed successfully.",
+ self.operation,
+ self.object_name,
+ total,
+ )
Review Comment:
PR description says the change is limited to making failures visible via
warnings (no new policy/behavior beyond that). This adds an info log line on
full success, which is additional behavior and may increase log volume. Either
update the PR description to reflect this, or change this branch to be silent /
debug-level to align with the stated scope.
##########
providers/salesforce/tests/unit/salesforce/operators/test_bulk.py:
##########
@@ -271,3 +271,87 @@ def test_execute_salesforce_bulk_hard_delete(self,
mock_get_conn):
batch_size=batch_size,
use_serial=use_serial,
)
+
+
+
@patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn")
+ def test_log_result_failures_warns_on_failed_records(self, mock_get_conn):
+ """
+ When records come back with success=False, _log_result_failures should
emit warnings
+ containing the Salesforce status code and message.
+ """
+ failed_result = [
+ {"success": True, "created": True, "id": "001xx0000001AAA",
"errors": []},
+ {
+ "success": False,
+ "created": False,
+ "id": None,
+ "errors": [
+ {
+ "statusCode": "INVALID_FIELD",
+ "message": "No such column \'Bad_Field\'",
+ "fields": [],
+ }
+ ],
+ },
+ ]
+ mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock(
+ return_value=failed_result
+ )
+
+ operator = SalesforceBulkOperator(
+ task_id="bulk_insert_with_failure",
+ operation="insert",
+ object_name="Account",
+ payload=[{"Name": "OK"}, {"Bad_Field": "x"}],
+ )
+
+ with pytest.raises(Exception):
+ pass
+
+ result = operator.execute(context={})
+ assert result is None
Review Comment:
The test names/docstrings claim warnings are emitted (or not emitted), but
the tests currently don’t assert on logs at all, and `with
pytest.raises(Exception): pass` is a no-op that doesn’t validate behavior. Use
`caplog` (or `assertLogs`) to assert that warnings are produced for failed
records (including statusCode/message) and that no warnings are produced on
full success; remove the no-op `pytest.raises` block.
##########
providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py:
##########
@@ -84,6 +84,42 @@ def _validate_inputs(self) -> None:
f"Available operations are {self.available_operations}."
)
+ def _log_result_failures(self, result: list) -> None:
+ """
+ Log a warning for every record in the Salesforce Bulk API result that
has ``success=False``.
+
+ Salesforce's Bulk API does not raise when individual records are
rejected — it signals them
+ via ``success=False`` in the result list. Without this, those
failures are completely
+ invisible unless the caller manually inspects the XCom value.
+ """
+ failed = [r for r in result if not r.get("success")]
+ total = len(result)
+
+ if failed:
+ self.log.warning(
+ "Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
+ self.operation,
+ self.object_name,
+ len(failed),
+ total,
+ )
+ for idx, record in enumerate(failed):
+ for error in record.get("errors", []):
+ self.log.warning(
+ "Record failure %d — status: %s | message: %s |
fields: %s",
+ idx,
+ error.get("statusCode"),
+ error.get("message"),
+ error.get("fields"),
+ )
Review Comment:
This creates an additional list (`failed`) that can be sizable (default
batch sizes can be large), effectively duplicating references and increasing
memory churn. Consider iterating `result` once, counting failures and logging
per failure as you go (or collecting only minimal data needed for the summary)
to avoid allocating a second list.
```suggestion
total = len(result)
failed_count = 0
failure_index = 0
for record in result:
if not record.get("success"):
for error in record.get("errors", []):
self.log.warning(
"Record failure %d — status: %s | message: %s |
fields: %s",
failure_index,
error.get("statusCode"),
error.get("message"),
error.get("fields"),
)
failed_count += 1
failure_index += 1
if failed_count:
self.log.warning(
"Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
self.operation,
self.object_name,
failed_count,
total,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]