This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git
The following commit(s) were added to refs/heads/main by this push: new 4c97cd6 feat(llm): automatic backup graph data timely (#151) 4c97cd6 is described below commit 4c97cd65999e551aaf0ee5493b179a2b1a91713e Author: SoJGooo <102796027+mrjs...@users.noreply.github.com> AuthorDate: Thu Jan 2 19:07:31 2025 +0800 feat(llm): automatic backup graph data timely (#151) TODO: Add a button for user to manually run it --------- Co-authored-by: imbajin <j...@apache.org> --- .gitattributes | 1 + hugegraph-llm/requirements.txt | 1 + .../src/hugegraph_llm/demo/rag_demo/app.py | 33 +----- .../src/hugegraph_llm/demo/rag_demo/other_block.py | 118 +++++++++++++++++++++ .../pyhugegraph/api/schema_manage/edge_label.py | 9 ++ 5 files changed, 131 insertions(+), 31 deletions(-) diff --git a/.gitattributes b/.gitattributes index 333c08e..948dfc7 100644 --- a/.gitattributes +++ b/.gitattributes @@ -9,6 +9,7 @@ apache-release.sh export-ignore *.docx export-ignore # ignored directory +hugegraph-ml/ export-ignore .github/ export-ignore .idea/ export-ignore style/ export-ignore diff --git a/hugegraph-llm/requirements.txt b/hugegraph-llm/requirements.txt index 9cc8c01..5f369f8 100644 --- a/hugegraph-llm/requirements.txt +++ b/hugegraph-llm/requirements.txt @@ -15,3 +15,4 @@ pyarrow~=17.0.0 # TODO: a temporary dependency for pandas, figure out why Import pandas~=2.2.2 openpyxl~=3.1.5 pydantic-settings~=2.6.1 +apscheduler~=3.10.4 diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py index e0508dd..bcc1198 100644 --- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py +++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py @@ -15,10 +15,7 @@ # specific language governing permissions and limitations # under the License. - import argparse -import asyncio -from contextlib import asynccontextmanager import gradio as gr import uvicorn @@ -37,15 +34,16 @@ from hugegraph_llm.demo.rag_demo.configs_block import ( apply_graph_config, ) from hugegraph_llm.demo.rag_demo.other_block import create_other_block +from hugegraph_llm.demo.rag_demo.other_block import lifespan from hugegraph_llm.demo.rag_demo.rag_block import create_rag_block, rag_answer from hugegraph_llm.demo.rag_demo.text2gremlin_block import create_text2gremlin_block, graph_rag_recall from hugegraph_llm.demo.rag_demo.vector_graph_block import create_vector_graph_block from hugegraph_llm.resources.demo.css import CSS -from hugegraph_llm.utils.graph_index_utils import update_vid_embedding from hugegraph_llm.utils.log import log sec = HTTPBearer() + def authenticate(credentials: HTTPAuthorizationCredentials = Depends(sec)): correct_token = admin_settings.user_token if credentials.credentials != correct_token: @@ -57,33 +55,6 @@ def authenticate(credentials: HTTPAuthorizationCredentials = Depends(sec)): headers={"WWW-Authenticate": "Bearer"}, ) - # TODO: move the logic to a separate file -async def timely_update_vid_embedding(): - while True: - try: - await asyncio.to_thread(update_vid_embedding) - log.info("rebuild_vid_index timely executed successfully.") - except asyncio.CancelledError as ce: - log.info("Periodic task has been cancelled due to: %s", ce) - break - except Exception as e: - log.error("Failed to execute rebuild_vid_index: %s", e, exc_info=True) - raise Exception("Failed to execute rebuild_vid_index") from e - await asyncio.sleep(3600) - - -@asynccontextmanager -async def lifespan(app: FastAPI): # pylint: disable=W0621 - log.info("Starting periodic task...") - task = asyncio.create_task(timely_update_vid_embedding()) - yield - - log.info("Stopping periodic task...") - task.cancel() - try: - await task - except asyncio.CancelledError: - log.info("Periodic task has been cancelled.") # pylint: disable=C0301 def init_rag_ui() -> gr.Interface: diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py index a156442..143da2b 100644 --- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py +++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py @@ -15,9 +15,28 @@ # specific language governing permissions and limitations # under the License. +import asyncio +import json +import os +import shutil +from contextlib import asynccontextmanager +from datetime import datetime + import gradio as gr +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from fastapi import FastAPI +from hugegraph_llm.config import huge_settings, resource_path +from hugegraph_llm.utils.graph_index_utils import update_vid_embedding from hugegraph_llm.utils.hugegraph_utils import init_hg_test_data, run_gremlin_query +from hugegraph_llm.utils.log import log +from pyhugegraph.client import PyHugeClient + +MAX_BACKUP_DIRS = 7 +MAX_VERTICES = 100000 +MAX_EDGES = 200000 +BACKUP_DIR = str(os.path.join(resource_path, huge_settings.graph_name, "backup")) def create_other_block(): @@ -35,3 +54,102 @@ def create_other_block(): out = gr.Textbox(label="Init Graph Demo Result", show_copy_button=True) btn = gr.Button("(BETA) Init HugeGraph test data (🚧)") btn.click(fn=init_hg_test_data, inputs=inp, outputs=out) # pylint: disable=no-member + + +def create_dir_safely(path): + if not os.path.exists(path): + os.makedirs(path) + +# TODO: move the logic to a separate file +def backup_data(): + try: + client = PyHugeClient( + huge_settings.graph_ip, + huge_settings.graph_port, + huge_settings.graph_name, + huge_settings.graph_user, + huge_settings.graph_pwd, + huge_settings.graph_space, + ) + + create_dir_safely(BACKUP_DIR) + + date_str = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_subdir = os.path.join(BACKUP_DIR, f"{date_str}") + create_dir_safely(backup_subdir) + + files = { + "vertices.json": f"g.V().limit({MAX_VERTICES})", + "edges.json": f"g.E().id().limit({MAX_EDGES})", + "schema.json": client.schema().getSchema() + } + + for filename, query in files.items(): + with open(os.path.join(backup_subdir, filename), "w", encoding="utf-8") as f: + data = client.gremlin().exec(query)["data"] if "schema" not in filename else query + json.dump(data, f) + + log.info("Backup completed successfully in %s.", backup_subdir) + manage_backup_retention() + except Exception as e: + log.critical("Backup failed: %s", e, exc_info=True) + raise Exception("Failed to execute backup") from e + + +def manage_backup_retention(): + try: + backup_dirs = [ + os.path.join(BACKUP_DIR, d) + for d in os.listdir(BACKUP_DIR) + if os.path.isdir(os.path.join(BACKUP_DIR, d)) + ] + backup_dirs.sort(key=os.path.getctime) + + while len(backup_dirs) > MAX_BACKUP_DIRS: + old_backup = backup_dirs.pop(0) + shutil.rmtree(old_backup) + log.info("Deleted old backup: %s", old_backup) + except Exception as e: + log.error("Failed to manage backup retention: %s", e, exc_info=True) + raise Exception("Failed to manage backup retention") from e + + +async def timely_update_vid_embedding(): + while True: + try: + await asyncio.to_thread(update_vid_embedding) + log.info("rebuild_vid_index timely executed successfully.") + except asyncio.CancelledError as ce: + log.info("Periodic task has been cancelled due to: %s", ce) + break + except Exception as e: + log.error("Failed to execute rebuild_vid_index: %s", e, exc_info=True) + raise Exception("Failed to execute rebuild_vid_index") from e + await asyncio.sleep(3600) + + +@asynccontextmanager +async def lifespan(app: FastAPI): # pylint: disable=W0621 + log.info("Starting background scheduler...") + scheduler = AsyncIOScheduler() + scheduler.add_job( + backup_data, + trigger=CronTrigger(hour=14, minute=16), + id="daily_backup", + replace_existing=True + ) + scheduler.start() + + log.info("Starting vid embedding update task...") + embedding_task = asyncio.create_task(timely_update_vid_embedding()) + yield + + log.info("Stopping vid embedding update task...") + embedding_task.cancel() + try: + await embedding_task + except asyncio.CancelledError: + log.info("Vid embedding update task cancelled.") + + log.info("Shutting down background scheduler...") + scheduler.shutdown() diff --git a/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py b/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py index 636a564..93f2180 100644 --- a/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py +++ b/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py @@ -87,6 +87,15 @@ class EdgeLabel(HugeParamsBase): self._parameter_holder.set("not_exist", False) return self + @decorator_params + def enableLabelIndex(self, flag) -> "EdgeLabel": + """ + Set whether to enable label indexing. If enabled, you can use `edge_labels[label]` to access the edge's label. + Default is False. + """ + self._parameter_holder.set("enable_label_index", flag) + return self + @decorator_create def create(self): dic = self._parameter_holder.get_dic()