github-actions[bot] commented on code in PR #63062:
URL: https://github.com/apache/doris/pull/63062#discussion_r3205870180


##########
be/src/udf/python/python_server.py:
##########
@@ -1607,6 +1612,31 @@ def _get_udaf_state_manager(
 
         return self.udaf_state_managers[func_key]
 
+    def _clear_udaf_state_cache_by_function_id(self, function_id: int) -> int:
+        """
+        Clear UDAF managers for a dropped function id.
+
+        DROP FUNCTION cache cleanup is asynchronous. The runtime key still 
includes
+        function_id for correctness, while this action releases old states and 
class
+        objects after the drop task reaches this Python process.
+        """
+        prefix = f"{function_id}:"
+        cleared = 0
+
+        with self.udaf_managers_lock:
+            keys_to_remove = [
+                key for key in self.udaf_state_managers if 
key.startswith(prefix)
+            ]
+            for key in keys_to_remove:
+                manager = self.udaf_state_managers.pop(key)
+                manager.states.clear()
+                cleared += 1

Review Comment:
   Clearing `manager.states` can invalidate in-flight UDAF queries. The DROP 
cleanup task is submitted asynchronously after FE removes the function, while 
an already-started query can still have a Flight exchange using this same 
`UDAFStateManager` for the old function id. If this action runs between that 
query's CREATE/ACCUMULATE and later SERIALIZE/FINALIZE/DESTROY calls, those 
operations will find their `place_id` entries removed and fail with 
`KeyError`/failed UDAF results. Since adding `function_id` to the key already 
prevents a recreated function from reusing the old class, cleanup should detach 
the manager from `udaf_state_managers` without clearing a manager that active 
exchanges may still reference, or add explicit lifecycle/ref-count 
coordination. Also consider returning the manager from 
`_get_udaf_state_manager()` while still under the lock so a concurrent pop 
cannot occur between lookup and return.



##########
be/src/udf/python/python_server.cpp:
##########
@@ -413,7 +414,19 @@ Status PythonServerManager::clear_module_cache(const 
std::string& location) {
     }
 
     std::string body = fmt::format(R"({{"location": "{}"}})", location);
+    return _broadcast_action_to_processes("clear_module_cache", body,
+                                          fmt::format("location={}", 
location));
+}
+
+void PythonServerManager::clear_udaf_state_cache(int64_t function_id) {
+    std::string body = fmt::format(R"({{"function_id": {}}})", function_id);
+    THROW_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache", 
body,
+                                                  
fmt::format("function_id={}", function_id)));

Review Comment:
   This can crash the BE from a DROP FUNCTION cleanup task. 
`_broadcast_action_to_processes()` returns an error whenever an active Python 
process fails `DoAction`/`Next`, and `THROW_IF_ERROR` converts that into a 
`doris::Exception`. The caller is `clean_udf_cache_callback()`, which is run by 
`TaskWorkerPool` via `_callback(task)` without any catch boundary, so an 
uncaught exception terminates the worker thread/process instead of reporting a 
best-effort cache cleanup failure. The existing module-cache cleanup path logs 
and continues; this should return `Status` and be handled with a warning (or 
use the existing cleanup-style `WARN_IF_ERROR`) rather than throwing out of the 
task callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to