turbaszek commented on a change in pull request #9482:
URL: https://github.com/apache/airflow/pull/9482#discussion_r449455360
##########
File path: airflow/api_connexion/endpoints/xcom_endpoint.py
##########
@@ -89,15 +111,78 @@ def get_xcom_entry(
return xcom_collection_item_schema.dump(query_object)
-def patch_xcom_entry():
+@provide_session
+def patch_xcom_entry(
+ session: Session,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ xcom_key: str,
+ update_mask: Optional[List[str]] = None
+):
"""
Update an XCom entry
"""
- raise NotImplementedError("Not implemented yet.")
+ dag_run = session.query(DR).filter(DR.run_id == dag_run_id,
+ DR.dag_id == dag_id).first()
+ if not dag_run:
+ raise NotFound(f'DAGRun with dag_id:{dag_id} and run_id:{dag_run_id}
not found')
+
+ xcom_entry = session.query(XCom).filter(XCom.dag_id == dag_id,
+ XCom.task_id == task_id,
+ XCom.key == xcom_key).first()
+
+ if not xcom_entry:
+ raise NotFound("XCom not found")
+ try:
+ body = xcom_schema.load(request.json, partial=("dag_id", "task_id",
+ "key",
"execution_date"))
+ except ValidationError as err:
+ raise BadRequest(detail=err.messages.get('_schema',
[str(err.messages)])[0])
+ data = body.data
+ # Check that other attributes are not being updated
+ for field in data:
+ if field != 'value' and data[field] != getattr(xcom_entry, field):
+ raise BadRequest(detail=f"'{field}' cannot be updated")
+ if update_mask:
+ update_mask = [i.strip() for i in update_mask]
+ data_ = {}
+ for field in update_mask:
+ if field in data and field != 'value':
+ raise BadRequest(detail=f"{field} cannot be updated")
+ elif field in data and field == 'value':
+ data_[field] = data[field]
+ else:
+ raise BadRequest(detail=f"Unknown field '{field}' in
update_mask")
+ data = data_
+ data['value'] = XCom.serialize_value(data['value'])
+ xcom_entry.value = data['value']
+ session.add(xcom_entry)
+ session.commit()
+ return xcom_schema.dump(xcom_entry)
Review comment:
Commit is performed by `@provide_session` and I'm wondering if we need
another one here explicit. I think using the one from `provide_session` has the
advantage that if returning response will fail then there will be no change on
db level. WDYT? @mik-laj @feluelle @potiuk
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]