feluelle commented on a change in pull request #9482:
URL: https://github.com/apache/airflow/pull/9482#discussion_r449463417



##########
File path: airflow/api_connexion/endpoints/xcom_endpoint.py
##########
@@ -89,15 +111,78 @@ def get_xcom_entry(
     return xcom_collection_item_schema.dump(query_object)
 
 
-def patch_xcom_entry():
+@provide_session
+def patch_xcom_entry(
+    session: Session,
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    xcom_key: str,
+    update_mask: Optional[List[str]] = None
+):
     """
     Update an XCom entry
     """
-    raise NotImplementedError("Not implemented yet.")
+    dag_run = session.query(DR).filter(DR.run_id == dag_run_id,
+                                       DR.dag_id == dag_id).first()
+    if not dag_run:
+        raise NotFound(f'DAGRun with dag_id:{dag_id} and run_id:{dag_run_id} 
not found')
+
+    xcom_entry = session.query(XCom).filter(XCom.dag_id == dag_id,
+                                            XCom.task_id == task_id,
+                                            XCom.key == xcom_key).first()
+
+    if not xcom_entry:
+        raise NotFound("XCom not found")
+    try:
+        body = xcom_schema.load(request.json, partial=("dag_id", "task_id",
+                                                       "key", 
"execution_date"))
+    except ValidationError as err:
+        raise BadRequest(detail=err.messages.get('_schema', 
[str(err.messages)])[0])
+    data = body.data
+    # Check that other attributes are not being updated
+    for field in data:
+        if field != 'value' and data[field] != getattr(xcom_entry, field):
+            raise BadRequest(detail=f"'{field}' cannot be updated")
+    if update_mask:
+        update_mask = [i.strip() for i in update_mask]
+        data_ = {}
+        for field in update_mask:
+            if field in data and field != 'value':
+                raise BadRequest(detail=f"{field} cannot be updated")
+            elif field in data and field == 'value':
+                data_[field] = data[field]
+            else:
+                raise BadRequest(detail=f"Unknown field '{field}' in 
update_mask")
+        data = data_
+    data['value'] = XCom.serialize_value(data['value'])
+    xcom_entry.value = data['value']
+    session.add(xcom_entry)
+    session.commit()
+    return xcom_schema.dump(xcom_entry)

Review comment:
       > [...] then there will be no change on db level.
   
   Which is correct. So I am for removing the manual commit from there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to