This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git
The following commit(s) were added to refs/heads/main by this push: new f6b547c refactor(llm): change QPS -> RPM for timer decorator (#241) f6b547c is described below commit f6b547c81fbf7bcdd5bf8349d4842cfebd0b3eff Author: Ethereal-O <91931223+etherea...@users.noreply.github.com> AuthorDate: Tue May 20 19:47:41 2025 +0800 refactor(llm): change QPS -> RPM for timer decorator (#241) What did I do: 1. Change the function record_qps to record_rpm, which outputs RPM information instead of qps. 2. Replace the position where record_qps is used with record_rpm to ensure runtime accuracy. 3. Testing: Tested using the example of hugegraph_1lm.demo.rag_demo.app to correctly calculate RPM. 4. Testing: Simulate unexpected situations, such as incorrect configuration items, to ensure that this function does not report errors. close #229 --------- Co-authored-by: imbajin <j...@apache.org> --- .../src/hugegraph_llm/operators/graph_rag_task.py | 4 +-- .../operators/gremlin_generate_task.py | 4 +-- .../operators/kg_construction_task.py | 4 +-- .../src/hugegraph_llm/utils/decorators.py | 36 +++++++++++++++------- 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py b/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py index 9f3d64d..65c95db 100644 --- a/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py +++ b/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py @@ -31,7 +31,7 @@ from hugegraph_llm.operators.index_op.semantic_id_query import SemanticIdQuery from hugegraph_llm.operators.index_op.vector_index_query import VectorIndexQuery from hugegraph_llm.operators.llm_op.answer_synthesize import AnswerSynthesize from hugegraph_llm.operators.llm_op.keyword_extract import KeywordExtract -from hugegraph_llm.utils.decorators import log_time, log_operator_time, record_qps +from hugegraph_llm.utils.decorators import log_time, log_operator_time, record_rpm from hugegraph_llm.config import prompt, huge_settings @@ -235,7 +235,7 @@ class RAGPipeline: return self @log_time("total time") - @record_qps + @record_rpm def run(self, **kwargs) -> Dict[str, Any]: """ Execute all operators in the pipeline in sequence. diff --git a/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py b/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py index 95ce59f..70f3d27 100644 --- a/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py +++ b/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py @@ -24,7 +24,7 @@ from hugegraph_llm.operators.hugegraph_op.schema_manager import SchemaManager from hugegraph_llm.operators.index_op.build_gremlin_example_index import BuildGremlinExampleIndex from hugegraph_llm.operators.index_op.gremlin_example_index_query import GremlinExampleIndexQuery from hugegraph_llm.operators.llm_op.gremlin_generate import GremlinGenerateSynthesize -from hugegraph_llm.utils.decorators import log_time, log_operator_time, record_qps +from hugegraph_llm.utils.decorators import log_time, log_operator_time, record_rpm class GremlinGenerator: @@ -69,7 +69,7 @@ class GremlinGenerator: return self @log_time("total time") - @record_qps + @record_rpm def run(self, **kwargs): context = kwargs for operator in self.operators: diff --git a/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py b/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py index 2fb5966..a736751 100644 --- a/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py +++ b/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py @@ -31,7 +31,7 @@ from hugegraph_llm.operators.index_op.build_vector_index import BuildVectorIndex from hugegraph_llm.operators.llm_op.disambiguate_data import DisambiguateData from hugegraph_llm.operators.llm_op.info_extract import InfoExtract from hugegraph_llm.operators.llm_op.property_graph_extract import PropertyGraphExtract -from hugegraph_llm.utils.decorators import log_time, log_operator_time, record_qps +from hugegraph_llm.utils.decorators import log_time, log_operator_time, record_rpm from pyhugegraph.client import PyHugeClient @@ -97,7 +97,7 @@ class KgBuilder: return self @log_time("total time") - @record_qps + @record_rpm def run(self, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: for operator in self.operators: context = self._run_operator(operator, context) diff --git a/hugegraph-llm/src/hugegraph_llm/utils/decorators.py b/hugegraph-llm/src/hugegraph_llm/utils/decorators.py index 7ffda08..b07de6f 100644 --- a/hugegraph-llm/src/hugegraph_llm/utils/decorators.py +++ b/hugegraph-llm/src/hugegraph_llm/utils/decorators.py @@ -15,8 +15,8 @@ # specific language governing permissions and limitations # under the License. -import time import asyncio +import time from functools import wraps from typing import Optional, Any, Callable @@ -74,22 +74,37 @@ def log_operator_time(func: Callable) -> Callable: log.debug("Operator %s finished in %.2f seconds", operator.__class__.__name__, op_time) # log.debug("Current context:\n%s", result) return result + return wrapper -def record_qps(func: Callable) -> Callable: +def record_rpm(func: Callable) -> Callable: @wraps(func) - def wrapper(*args: Any, **kwargs: Any) -> Any: + async def async_wrapper(*args: Any, **kwargs: Any) -> Any: + start = time.perf_counter() + result = await func(*args, **kwargs) + call_count = result.get("call_count", 0) + elapsed_time = time.perf_counter() - start + rpm = (call_count / elapsed_time * 60) if elapsed_time > 0 else 0 + if rpm >= 1: + log.debug("%s RPM: %.1f/min", args[0].__class__.__name__, rpm) + return result + + @wraps(func) + def sync_wrapper(*args: Any, **kwargs: Any) -> Any: start = time.perf_counter() result = func(*args, **kwargs) call_count = result.get("call_count", 0) - qps = call_count / (time.perf_counter() - start) - if qps >= 0.10: - log.debug("%s QPS: %.2f/s", args[0].__class__.__name__, qps) - else: - log.debug("%s QPS: %f/s", args[0].__class__.__name__, qps) + elapsed_time = time.perf_counter() - start + rpm = (call_count / elapsed_time * 60) if elapsed_time > 0 else 0 + if rpm >= 1: + log.debug("%s RPM: %.1f/min", args[0].__class__.__name__, rpm) return result - return wrapper + + if asyncio.iscoroutinefunction(func): + return async_wrapper + return sync_wrapper + def with_task_id(func: Callable) -> Callable: def wrapper(*args: Any, **kwargs: Any) -> Any: @@ -99,11 +114,10 @@ def with_task_id(func: Callable) -> Callable: # Store the original return value result = func(*args, **kwargs) - # Add the task_id to the function's context if hasattr(result, "__closure__") and result.__closure__: # If it's a closure, we can add the task_id to its context setattr(result, "task_id", task_id) - return result + return wrapper