This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git


The following commit(s) were added to refs/heads/main by this push:
     new f6b547c  refactor(llm): change QPS -> RPM for timer decorator (#241)
f6b547c is described below

commit f6b547c81fbf7bcdd5bf8349d4842cfebd0b3eff
Author: Ethereal-O <91931223+etherea...@users.noreply.github.com>
AuthorDate: Tue May 20 19:47:41 2025 +0800

    refactor(llm): change QPS -> RPM for timer decorator (#241)
    
    What did I do:
    1. Change the function record_qps to record_rpm, which outputs RPM
    information instead of qps.
    2. Replace the position where record_qps is used with record_rpm to
    ensure runtime accuracy.
    3. Testing: Tested using the example of hugegraph_1lm.demo.rag_demo.app
    to correctly calculate RPM.
    4. Testing: Simulate unexpected situations, such as incorrect
    configuration items, to ensure that this function does not report
    errors.
    
    
    close #229
    
    ---------
    
    Co-authored-by: imbajin <j...@apache.org>
---
 .../src/hugegraph_llm/operators/graph_rag_task.py  |  4 +--
 .../operators/gremlin_generate_task.py             |  4 +--
 .../operators/kg_construction_task.py              |  4 +--
 .../src/hugegraph_llm/utils/decorators.py          | 36 +++++++++++++++-------
 4 files changed, 31 insertions(+), 17 deletions(-)

diff --git a/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py 
b/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py
index 9f3d64d..65c95db 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py
@@ -31,7 +31,7 @@ from hugegraph_llm.operators.index_op.semantic_id_query 
import SemanticIdQuery
 from hugegraph_llm.operators.index_op.vector_index_query import 
VectorIndexQuery
 from hugegraph_llm.operators.llm_op.answer_synthesize import AnswerSynthesize
 from hugegraph_llm.operators.llm_op.keyword_extract import KeywordExtract
-from hugegraph_llm.utils.decorators import log_time, log_operator_time, 
record_qps
+from hugegraph_llm.utils.decorators import log_time, log_operator_time, 
record_rpm
 from hugegraph_llm.config import prompt, huge_settings
 
 
@@ -235,7 +235,7 @@ class RAGPipeline:
         return self
 
     @log_time("total time")
-    @record_qps
+    @record_rpm
     def run(self, **kwargs) -> Dict[str, Any]:
         """
         Execute all operators in the pipeline in sequence.
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py 
b/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py
index 95ce59f..70f3d27 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py
@@ -24,7 +24,7 @@ from hugegraph_llm.operators.hugegraph_op.schema_manager 
import SchemaManager
 from hugegraph_llm.operators.index_op.build_gremlin_example_index import 
BuildGremlinExampleIndex
 from hugegraph_llm.operators.index_op.gremlin_example_index_query import 
GremlinExampleIndexQuery
 from hugegraph_llm.operators.llm_op.gremlin_generate import 
GremlinGenerateSynthesize
-from hugegraph_llm.utils.decorators import log_time, log_operator_time, 
record_qps
+from hugegraph_llm.utils.decorators import log_time, log_operator_time, 
record_rpm
 
 
 class GremlinGenerator:
@@ -69,7 +69,7 @@ class GremlinGenerator:
         return self
 
     @log_time("total time")
-    @record_qps
+    @record_rpm
     def run(self, **kwargs):
         context = kwargs
         for operator in self.operators:
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py 
b/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py
index 2fb5966..a736751 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py
@@ -31,7 +31,7 @@ from hugegraph_llm.operators.index_op.build_vector_index 
import BuildVectorIndex
 from hugegraph_llm.operators.llm_op.disambiguate_data import DisambiguateData
 from hugegraph_llm.operators.llm_op.info_extract import InfoExtract
 from hugegraph_llm.operators.llm_op.property_graph_extract import 
PropertyGraphExtract
-from hugegraph_llm.utils.decorators import log_time, log_operator_time, 
record_qps
+from hugegraph_llm.utils.decorators import log_time, log_operator_time, 
record_rpm
 from pyhugegraph.client import PyHugeClient
 
 
@@ -97,7 +97,7 @@ class KgBuilder:
         return self
 
     @log_time("total time")
-    @record_qps
+    @record_rpm
     def run(self, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
         for operator in self.operators:
             context = self._run_operator(operator, context)
diff --git a/hugegraph-llm/src/hugegraph_llm/utils/decorators.py 
b/hugegraph-llm/src/hugegraph_llm/utils/decorators.py
index 7ffda08..b07de6f 100644
--- a/hugegraph-llm/src/hugegraph_llm/utils/decorators.py
+++ b/hugegraph-llm/src/hugegraph_llm/utils/decorators.py
@@ -15,8 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import time
 import asyncio
+import time
 from functools import wraps
 from typing import Optional, Any, Callable
 
@@ -74,22 +74,37 @@ def log_operator_time(func: Callable) -> Callable:
             log.debug("Operator %s finished in %.2f seconds", 
operator.__class__.__name__, op_time)
             # log.debug("Current context:\n%s", result)
         return result
+
     return wrapper
 
 
-def record_qps(func: Callable) -> Callable:
+def record_rpm(func: Callable) -> Callable:
     @wraps(func)
-    def wrapper(*args: Any, **kwargs: Any) -> Any:
+    async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
+        start = time.perf_counter()
+        result = await func(*args, **kwargs)
+        call_count = result.get("call_count", 0)
+        elapsed_time = time.perf_counter() - start
+        rpm = (call_count / elapsed_time * 60) if elapsed_time > 0 else 0
+        if rpm >= 1:
+            log.debug("%s RPM: %.1f/min", args[0].__class__.__name__, rpm)
+        return result
+
+    @wraps(func)
+    def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
         start = time.perf_counter()
         result = func(*args, **kwargs)
         call_count = result.get("call_count", 0)
-        qps = call_count / (time.perf_counter() - start)
-        if qps >= 0.10:
-            log.debug("%s QPS: %.2f/s", args[0].__class__.__name__, qps)
-        else:
-            log.debug("%s QPS: %f/s", args[0].__class__.__name__, qps)
+        elapsed_time = time.perf_counter() - start
+        rpm = (call_count / elapsed_time * 60) if elapsed_time > 0 else 0
+        if rpm >= 1:
+            log.debug("%s RPM: %.1f/min", args[0].__class__.__name__, rpm)
         return result
-    return wrapper
+
+    if asyncio.iscoroutinefunction(func):
+        return async_wrapper
+    return sync_wrapper
+
 
 def with_task_id(func: Callable) -> Callable:
     def wrapper(*args: Any, **kwargs: Any) -> Any:
@@ -99,11 +114,10 @@ def with_task_id(func: Callable) -> Callable:
 
         # Store the original return value
         result = func(*args, **kwargs)
-
         # Add the task_id to the function's context
         if hasattr(result, "__closure__") and result.__closure__:
             # If it's a closure, we can add the task_id to its context
             setattr(result, "task_id", task_id)
-
         return result
+
     return wrapper

Reply via email to