This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c97cd6  feat(llm): automatic backup graph data timely (#151)
4c97cd6 is described below

commit 4c97cd65999e551aaf0ee5493b179a2b1a91713e
Author: SoJGooo <102796027+mrjs...@users.noreply.github.com>
AuthorDate: Thu Jan 2 19:07:31 2025 +0800

    feat(llm): automatic backup graph data timely (#151)
    
    TODO:
    Add a button for user to manually run it
    
    ---------
    
    Co-authored-by: imbajin <j...@apache.org>
---
 .gitattributes                                     |   1 +
 hugegraph-llm/requirements.txt                     |   1 +
 .../src/hugegraph_llm/demo/rag_demo/app.py         |  33 +-----
 .../src/hugegraph_llm/demo/rag_demo/other_block.py | 118 +++++++++++++++++++++
 .../pyhugegraph/api/schema_manage/edge_label.py    |   9 ++
 5 files changed, 131 insertions(+), 31 deletions(-)

diff --git a/.gitattributes b/.gitattributes
index 333c08e..948dfc7 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -9,6 +9,7 @@ apache-release.sh export-ignore
 *.docx export-ignore
 
 # ignored directory
+hugegraph-ml/ export-ignore
 .github/ export-ignore
 .idea/ export-ignore
 style/ export-ignore
diff --git a/hugegraph-llm/requirements.txt b/hugegraph-llm/requirements.txt
index 9cc8c01..5f369f8 100644
--- a/hugegraph-llm/requirements.txt
+++ b/hugegraph-llm/requirements.txt
@@ -15,3 +15,4 @@ pyarrow~=17.0.0 # TODO: a temporary dependency for pandas, 
figure out why Import
 pandas~=2.2.2
 openpyxl~=3.1.5
 pydantic-settings~=2.6.1
+apscheduler~=3.10.4
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py 
b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
index e0508dd..bcc1198 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
@@ -15,10 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-
 import argparse
-import asyncio
-from contextlib import asynccontextmanager
 
 import gradio as gr
 import uvicorn
@@ -37,15 +34,16 @@ from hugegraph_llm.demo.rag_demo.configs_block import (
     apply_graph_config,
 )
 from hugegraph_llm.demo.rag_demo.other_block import create_other_block
+from hugegraph_llm.demo.rag_demo.other_block import lifespan
 from hugegraph_llm.demo.rag_demo.rag_block import create_rag_block, rag_answer
 from hugegraph_llm.demo.rag_demo.text2gremlin_block import 
create_text2gremlin_block, graph_rag_recall
 from hugegraph_llm.demo.rag_demo.vector_graph_block import 
create_vector_graph_block
 from hugegraph_llm.resources.demo.css import CSS
-from hugegraph_llm.utils.graph_index_utils import update_vid_embedding
 from hugegraph_llm.utils.log import log
 
 sec = HTTPBearer()
 
+
 def authenticate(credentials: HTTPAuthorizationCredentials = Depends(sec)):
     correct_token = admin_settings.user_token
     if credentials.credentials != correct_token:
@@ -57,33 +55,6 @@ def authenticate(credentials: HTTPAuthorizationCredentials = 
Depends(sec)):
             headers={"WWW-Authenticate": "Bearer"},
         )
 
- # TODO: move the logic to a separate file
-async def timely_update_vid_embedding():
-    while True:
-        try:
-            await asyncio.to_thread(update_vid_embedding)
-            log.info("rebuild_vid_index timely executed successfully.")
-        except asyncio.CancelledError as ce:
-            log.info("Periodic task has been cancelled due to: %s", ce)
-            break
-        except Exception as e:
-            log.error("Failed to execute rebuild_vid_index: %s", e, 
exc_info=True)
-            raise Exception("Failed to execute rebuild_vid_index") from e
-        await asyncio.sleep(3600)
-
-
-@asynccontextmanager
-async def lifespan(app: FastAPI):  # pylint: disable=W0621
-    log.info("Starting periodic task...")
-    task = asyncio.create_task(timely_update_vid_embedding())
-    yield
-
-    log.info("Stopping periodic task...")
-    task.cancel()
-    try:
-        await task
-    except asyncio.CancelledError:
-        log.info("Periodic task has been cancelled.")
 
 # pylint: disable=C0301
 def init_rag_ui() -> gr.Interface:
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py 
b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
index a156442..143da2b 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
@@ -15,9 +15,28 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import asyncio
+import json
+import os
+import shutil
+from contextlib import asynccontextmanager
+from datetime import datetime
+
 import gradio as gr
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from apscheduler.triggers.cron import CronTrigger
+from fastapi import FastAPI
 
+from hugegraph_llm.config import huge_settings, resource_path
+from hugegraph_llm.utils.graph_index_utils import update_vid_embedding
 from hugegraph_llm.utils.hugegraph_utils import init_hg_test_data, 
run_gremlin_query
+from hugegraph_llm.utils.log import log
+from pyhugegraph.client import PyHugeClient
+
+MAX_BACKUP_DIRS = 7
+MAX_VERTICES = 100000
+MAX_EDGES = 200000
+BACKUP_DIR = str(os.path.join(resource_path, huge_settings.graph_name, 
"backup"))
 
 
 def create_other_block():
@@ -35,3 +54,102 @@ def create_other_block():
             out = gr.Textbox(label="Init Graph Demo Result", 
show_copy_button=True)
         btn = gr.Button("(BETA) Init HugeGraph test data (🚧)")
         btn.click(fn=init_hg_test_data, inputs=inp, outputs=out)  # pylint: 
disable=no-member
+
+
+def create_dir_safely(path):
+    if not os.path.exists(path):
+        os.makedirs(path)
+
+# TODO: move the logic to a separate file
+def backup_data():
+    try:
+        client = PyHugeClient(
+            huge_settings.graph_ip,
+            huge_settings.graph_port,
+            huge_settings.graph_name,
+            huge_settings.graph_user,
+            huge_settings.graph_pwd,
+            huge_settings.graph_space,
+        )
+
+        create_dir_safely(BACKUP_DIR)
+
+        date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
+        backup_subdir = os.path.join(BACKUP_DIR, f"{date_str}")
+        create_dir_safely(backup_subdir)
+
+        files = {
+            "vertices.json": f"g.V().limit({MAX_VERTICES})",
+            "edges.json": f"g.E().id().limit({MAX_EDGES})",
+            "schema.json": client.schema().getSchema()
+        }
+
+        for filename, query in files.items():
+            with open(os.path.join(backup_subdir, filename), "w", 
encoding="utf-8") as f:
+                data = client.gremlin().exec(query)["data"] if "schema" not in 
filename else query
+                json.dump(data, f)
+
+        log.info("Backup completed successfully in %s.", backup_subdir)
+        manage_backup_retention()
+    except Exception as e:
+        log.critical("Backup failed: %s", e, exc_info=True)
+        raise Exception("Failed to execute backup") from e
+
+
+def manage_backup_retention():
+    try:
+        backup_dirs = [
+            os.path.join(BACKUP_DIR, d)
+            for d in os.listdir(BACKUP_DIR)
+            if os.path.isdir(os.path.join(BACKUP_DIR, d))
+        ]
+        backup_dirs.sort(key=os.path.getctime)
+
+        while len(backup_dirs) > MAX_BACKUP_DIRS:
+            old_backup = backup_dirs.pop(0)
+            shutil.rmtree(old_backup)
+            log.info("Deleted old backup: %s", old_backup)
+    except Exception as e:
+        log.error("Failed to manage backup retention: %s", e, exc_info=True)
+        raise Exception("Failed to manage backup retention") from e
+
+
+async def timely_update_vid_embedding():
+    while True:
+        try:
+            await asyncio.to_thread(update_vid_embedding)
+            log.info("rebuild_vid_index timely executed successfully.")
+        except asyncio.CancelledError as ce:
+            log.info("Periodic task has been cancelled due to: %s", ce)
+            break
+        except Exception as e:
+            log.error("Failed to execute rebuild_vid_index: %s", e, 
exc_info=True)
+            raise Exception("Failed to execute rebuild_vid_index") from e
+        await asyncio.sleep(3600)
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):  # pylint: disable=W0621
+    log.info("Starting background scheduler...")
+    scheduler = AsyncIOScheduler()
+    scheduler.add_job(
+        backup_data,
+        trigger=CronTrigger(hour=14, minute=16),
+        id="daily_backup",
+        replace_existing=True
+    )
+    scheduler.start()
+
+    log.info("Starting vid embedding update task...")
+    embedding_task = asyncio.create_task(timely_update_vid_embedding())
+    yield
+
+    log.info("Stopping vid embedding update task...")
+    embedding_task.cancel()
+    try:
+        await embedding_task
+    except asyncio.CancelledError:
+        log.info("Vid embedding update task cancelled.")
+
+    log.info("Shutting down background scheduler...")
+    scheduler.shutdown()
diff --git 
a/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py 
b/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py
index 636a564..93f2180 100644
--- a/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py
+++ b/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py
@@ -87,6 +87,15 @@ class EdgeLabel(HugeParamsBase):
             self._parameter_holder.set("not_exist", False)
         return self
 
+    @decorator_params
+    def enableLabelIndex(self, flag) -> "EdgeLabel":
+        """
+        Set whether to enable label indexing. If enabled, you can use 
`edge_labels[label]` to access the edge's label.
+        Default is False.
+        """
+        self._parameter_holder.set("enable_label_index", flag)
+        return self
+
     @decorator_create
     def create(self):
         dic = self._parameter_holder.get_dic()

Reply via email to