Copilot commented on code in PR #64519:
URL: https://github.com/apache/airflow/pull/64519#discussion_r3025325393


##########
providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py:
##########
@@ -84,6 +84,42 @@ def _validate_inputs(self) -> None:
                 f"Available operations are {self.available_operations}."
             )
 
+    def _log_result_failures(self, result: list) -> None:
+        """
+        Log a warning for every record in the Salesforce Bulk API result that 
has ``success=False``.
+
+        Salesforce's Bulk API does not raise when individual records are 
rejected — it signals them
+        via ``success=False`` in the result list.  Without this, those 
failures are completely
+        invisible unless the caller manually inspects the XCom value.
+        """
+        failed = [r for r in result if not r.get("success")]
+        total = len(result)
+
+        if failed:
+            self.log.warning(
+                "Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
+                self.operation,
+                self.object_name,
+                len(failed),
+                total,
+            )
+            for idx, record in enumerate(failed):
+                for error in record.get("errors", []):
+                    self.log.warning(
+                        "Record failure %d — status: %s | message: %s | 
fields: %s",
+                        idx,
+                        error.get("statusCode"),
+                        error.get("message"),
+                        error.get("fields"),
+                    )

Review Comment:
   The logged `idx` is the index within the filtered `failed` list, not the 
original record index in the result/payload. This can mislead debugging (e.g., 
the first failure logs as 0 even if it was record 57). Iterate with 
`enumerate(result)` and log the original index for each failure (or include 
both original index and a failure counter).



##########
providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py:
##########
@@ -84,6 +84,42 @@ def _validate_inputs(self) -> None:
                 f"Available operations are {self.available_operations}."
             )
 
+    def _log_result_failures(self, result: list) -> None:
+        """
+        Log a warning for every record in the Salesforce Bulk API result that 
has ``success=False``.
+
+        Salesforce's Bulk API does not raise when individual records are 
rejected — it signals them
+        via ``success=False`` in the result list.  Without this, those 
failures are completely
+        invisible unless the caller manually inspects the XCom value.
+        """
+        failed = [r for r in result if not r.get("success")]
+        total = len(result)
+
+        if failed:
+            self.log.warning(
+                "Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
+                self.operation,
+                self.object_name,
+                len(failed),
+                total,
+            )
+            for idx, record in enumerate(failed):
+                for error in record.get("errors", []):
+                    self.log.warning(
+                        "Record failure %d — status: %s | message: %s | 
fields: %s",

Review Comment:
   The logged `idx` is the index within the filtered `failed` list, not the 
original record index in the result/payload. This can mislead debugging (e.g., 
the first failure logs as 0 even if it was record 57). Iterate with 
`enumerate(result)` and log the original index for each failure (or include 
both original index and a failure counter).
   ```suggestion
           total = len(result)
           failed_indices = [idx for idx, r in enumerate(result) if not 
r.get("success")]
   
           if failed_indices:
               self.log.warning(
                   "Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
                   self.operation,
                   self.object_name,
                   len(failed_indices),
                   total,
               )
               for idx in failed_indices:
                   record = result[idx]
                   for error in record.get("errors", []):
                       self.log.warning(
                           "Record failure at result index %d — status: %s | 
message: %s | fields: %s",
   ```



##########
providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py:
##########
@@ -84,6 +84,42 @@ def _validate_inputs(self) -> None:
                 f"Available operations are {self.available_operations}."
             )
 
+    def _log_result_failures(self, result: list) -> None:
+        """
+        Log a warning for every record in the Salesforce Bulk API result that 
has ``success=False``.
+
+        Salesforce's Bulk API does not raise when individual records are 
rejected — it signals them
+        via ``success=False`` in the result list.  Without this, those 
failures are completely
+        invisible unless the caller manually inspects the XCom value.
+        """
+        failed = [r for r in result if not r.get("success")]
+        total = len(result)
+
+        if failed:
+            self.log.warning(
+                "Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
+                self.operation,
+                self.object_name,
+                len(failed),
+                total,
+            )
+            for idx, record in enumerate(failed):
+                for error in record.get("errors", []):
+                    self.log.warning(
+                        "Record failure %d — status: %s | message: %s | 
fields: %s",
+                        idx,
+                        error.get("statusCode"),
+                        error.get("message"),
+                        error.get("fields"),
+                    )
+        else:
+            self.log.info(
+                "Salesforce Bulk API %s on %s completed: %d record(s) 
processed successfully.",
+                self.operation,
+                self.object_name,
+                total,
+            )

Review Comment:
   PR description says the change is limited to making failures visible via 
warnings (no new policy/behavior beyond that). This adds an info log line on 
full success, which is additional behavior and may increase log volume. Either 
update the PR description to reflect this, or change this branch to be silent / 
debug-level to align with the stated scope.



##########
providers/salesforce/tests/unit/salesforce/operators/test_bulk.py:
##########
@@ -271,3 +271,87 @@ def test_execute_salesforce_bulk_hard_delete(self, 
mock_get_conn):
             batch_size=batch_size,
             use_serial=use_serial,
         )
+
+
+    
@patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn")
+    def test_log_result_failures_warns_on_failed_records(self, mock_get_conn):
+        """
+        When records come back with success=False, _log_result_failures should 
emit warnings
+        containing the Salesforce status code and message.
+        """
+        failed_result = [
+            {"success": True, "created": True, "id": "001xx0000001AAA", 
"errors": []},
+            {
+                "success": False,
+                "created": False,
+                "id": None,
+                "errors": [
+                    {
+                        "statusCode": "INVALID_FIELD",
+                        "message": "No such column \'Bad_Field\'",
+                        "fields": [],
+                    }
+                ],
+            },
+        ]
+        mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock(
+            return_value=failed_result
+        )
+
+        operator = SalesforceBulkOperator(
+            task_id="bulk_insert_with_failure",
+            operation="insert",
+            object_name="Account",
+            payload=[{"Name": "OK"}, {"Bad_Field": "x"}],
+        )
+
+        with pytest.raises(Exception):
+            pass
+
+        result = operator.execute(context={})
+        assert result is None

Review Comment:
   The test names/docstrings claim warnings are emitted (or not emitted), but 
the tests currently don’t assert on logs at all, and `with 
pytest.raises(Exception): pass` is a no-op that doesn’t validate behavior. Use 
`caplog` (or `assertLogs`) to assert that warnings are produced for failed 
records (including statusCode/message) and that no warnings are produced on 
full success; remove the no-op `pytest.raises` block.



##########
providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py:
##########
@@ -84,6 +84,42 @@ def _validate_inputs(self) -> None:
                 f"Available operations are {self.available_operations}."
             )
 
+    def _log_result_failures(self, result: list) -> None:
+        """
+        Log a warning for every record in the Salesforce Bulk API result that 
has ``success=False``.
+
+        Salesforce's Bulk API does not raise when individual records are 
rejected — it signals them
+        via ``success=False`` in the result list.  Without this, those 
failures are completely
+        invisible unless the caller manually inspects the XCom value.
+        """
+        failed = [r for r in result if not r.get("success")]
+        total = len(result)
+
+        if failed:
+            self.log.warning(
+                "Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
+                self.operation,
+                self.object_name,
+                len(failed),
+                total,
+            )
+            for idx, record in enumerate(failed):
+                for error in record.get("errors", []):
+                    self.log.warning(
+                        "Record failure %d — status: %s | message: %s | 
fields: %s",
+                        idx,
+                        error.get("statusCode"),
+                        error.get("message"),
+                        error.get("fields"),
+                    )

Review Comment:
   This creates an additional list (`failed`) that can be sizable (default 
batch sizes can be large), effectively duplicating references and increasing 
memory churn. Consider iterating `result` once, counting failures and logging 
per failure as you go (or collecting only minimal data needed for the summary) 
to avoid allocating a second list.
   ```suggestion
           total = len(result)
           failed_count = 0
           failure_index = 0
   
           for record in result:
               if not record.get("success"):
                   for error in record.get("errors", []):
                       self.log.warning(
                           "Record failure %d — status: %s | message: %s | 
fields: %s",
                           failure_index,
                           error.get("statusCode"),
                           error.get("message"),
                           error.get("fields"),
                       )
                   failed_count += 1
                   failure_index += 1
   
           if failed_count:
               self.log.warning(
                   "Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
                   self.operation,
                   self.object_name,
                   failed_count,
                   total,
               )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to