This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git
The following commit(s) were added to refs/heads/main by this push: new 820bfb2 refactor(llm): add a button to backup data & count together (#153) 820bfb2 is described below commit 820bfb27ec4d2068ec43ccd1f980d2e3fed9392c Author: SoJGooo <102796027+mrjs...@users.noreply.github.com> AuthorDate: Fri Jan 3 18:56:48 2025 +0800 refactor(llm): add a button to backup data & count together (#153) 1. add a button to backup data 2. refactor backup_data 3. refactor timely_update_vid_embedding 4. return count & graph elements together --------- Co-authored-by: imbajin <j...@apache.org> --- .gitattributes | 1 + hugegraph-llm/.gitignore | 1 + .../src/hugegraph_llm/demo/rag_demo/other_block.py | 94 ++-------------------- .../src/hugegraph_llm/demo/rag_demo/rag_block.py | 1 + .../demo/rag_demo/vector_graph_block.py | 15 ++++ .../src/hugegraph_llm/utils/hugegraph_utils.py | 67 ++++++++++++++- 6 files changed, 92 insertions(+), 87 deletions(-) diff --git a/.gitattributes b/.gitattributes index 948dfc7..4a365f0 100644 --- a/.gitattributes +++ b/.gitattributes @@ -14,3 +14,4 @@ hugegraph-ml/ export-ignore .idea/ export-ignore style/ export-ignore scripts/ export-ignore +hugegraph-llm/src/hugegraph_llm/resources/backup-graph-data-4020/ export-ignore diff --git a/hugegraph-llm/.gitignore b/hugegraph-llm/.gitignore index 4de6eba..1740bd2 100644 --- a/hugegraph-llm/.gitignore +++ b/hugegraph-llm/.gitignore @@ -1,2 +1,3 @@ src/hugegraph_llm/resources/demo/questions_answers.xlsx src/hugegraph_llm/resources/demo/questions.xlsx +src/hugegraph_llm/resources/backup-graph-data-4020/ diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py index 143da2b..da10f50 100644 --- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py +++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py @@ -16,27 +16,16 @@ # under the License. import asyncio -import json -import os -import shutil from contextlib import asynccontextmanager -from datetime import datetime import gradio as gr from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from fastapi import FastAPI -from hugegraph_llm.config import huge_settings, resource_path -from hugegraph_llm.utils.graph_index_utils import update_vid_embedding -from hugegraph_llm.utils.hugegraph_utils import init_hg_test_data, run_gremlin_query +from hugegraph_llm.utils.hugegraph_utils import init_hg_test_data, run_gremlin_query, backup_data from hugegraph_llm.utils.log import log -from pyhugegraph.client import PyHugeClient - -MAX_BACKUP_DIRS = 7 -MAX_VERTICES = 100000 -MAX_EDGES = 200000 -BACKUP_DIR = str(os.path.join(resource_path, huge_settings.graph_name, "backup")) +from hugegraph_llm.demo.rag_demo.vector_graph_block import timely_update_vid_embedding def create_other_block(): @@ -48,6 +37,11 @@ def create_other_block(): btn.click(fn=run_gremlin_query, inputs=[inp], outputs=out) # pylint: disable=no-member gr.Markdown("---") + with gr.Row(): + inp = [] + out = gr.Textbox(label="Backup Graph Manually (Auto backup at 1:00 AM everyday)", show_copy_button=True) + btn = gr.Button("Backup Graph Data") + btn.click(fn=backup_data, inputs=inp, outputs=out) # pylint: disable=no-member with gr.Accordion("Init HugeGraph test data (🚧)", open=False): with gr.Row(): inp = [] @@ -56,85 +50,13 @@ def create_other_block(): btn.click(fn=init_hg_test_data, inputs=inp, outputs=out) # pylint: disable=no-member -def create_dir_safely(path): - if not os.path.exists(path): - os.makedirs(path) - -# TODO: move the logic to a separate file -def backup_data(): - try: - client = PyHugeClient( - huge_settings.graph_ip, - huge_settings.graph_port, - huge_settings.graph_name, - huge_settings.graph_user, - huge_settings.graph_pwd, - huge_settings.graph_space, - ) - - create_dir_safely(BACKUP_DIR) - - date_str = datetime.now().strftime("%Y%m%d_%H%M%S") - backup_subdir = os.path.join(BACKUP_DIR, f"{date_str}") - create_dir_safely(backup_subdir) - - files = { - "vertices.json": f"g.V().limit({MAX_VERTICES})", - "edges.json": f"g.E().id().limit({MAX_EDGES})", - "schema.json": client.schema().getSchema() - } - - for filename, query in files.items(): - with open(os.path.join(backup_subdir, filename), "w", encoding="utf-8") as f: - data = client.gremlin().exec(query)["data"] if "schema" not in filename else query - json.dump(data, f) - - log.info("Backup completed successfully in %s.", backup_subdir) - manage_backup_retention() - except Exception as e: - log.critical("Backup failed: %s", e, exc_info=True) - raise Exception("Failed to execute backup") from e - - -def manage_backup_retention(): - try: - backup_dirs = [ - os.path.join(BACKUP_DIR, d) - for d in os.listdir(BACKUP_DIR) - if os.path.isdir(os.path.join(BACKUP_DIR, d)) - ] - backup_dirs.sort(key=os.path.getctime) - - while len(backup_dirs) > MAX_BACKUP_DIRS: - old_backup = backup_dirs.pop(0) - shutil.rmtree(old_backup) - log.info("Deleted old backup: %s", old_backup) - except Exception as e: - log.error("Failed to manage backup retention: %s", e, exc_info=True) - raise Exception("Failed to manage backup retention") from e - - -async def timely_update_vid_embedding(): - while True: - try: - await asyncio.to_thread(update_vid_embedding) - log.info("rebuild_vid_index timely executed successfully.") - except asyncio.CancelledError as ce: - log.info("Periodic task has been cancelled due to: %s", ce) - break - except Exception as e: - log.error("Failed to execute rebuild_vid_index: %s", e, exc_info=True) - raise Exception("Failed to execute rebuild_vid_index") from e - await asyncio.sleep(3600) - - @asynccontextmanager async def lifespan(app: FastAPI): # pylint: disable=W0621 log.info("Starting background scheduler...") scheduler = AsyncIOScheduler() scheduler.add_job( backup_data, - trigger=CronTrigger(hour=14, minute=16), + trigger=CronTrigger(hour=1, minute=0), id="daily_backup", replace_existing=True ) diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/rag_block.py b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/rag_block.py index c10f84b..822b080 100644 --- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/rag_block.py +++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/rag_block.py @@ -198,6 +198,7 @@ def create_rag_block(): "Graph-only Answer", "Graph-Vector Answer", ] + # FIXME: "demo" might conflict with the graph name, it should be modified. answers_path = os.path.join(resource_path, "demo", "questions_answers.xlsx") questions_path = os.path.join(resource_path, "demo", "questions.xlsx") questions_template_path = os.path.join(resource_path, "demo", "questions_template.xlsx") diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py index 1e48e21..a78835f 100644 --- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py +++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py @@ -17,6 +17,7 @@ # pylint: disable=E1101 +import asyncio import gradio as gr from hugegraph_llm.config import prompt @@ -28,6 +29,7 @@ from hugegraph_llm.utils.graph_index_utils import ( import_graph_data, ) from hugegraph_llm.utils.vector_index_utils import clean_vector_index, build_vector_index, get_vector_index_info +from hugegraph_llm.utils.log import log def store_prompt(doc, schema, example_prompt): # update env variables: doc, schema and example_prompt @@ -139,3 +141,16 @@ def create_vector_graph_block(): tab_upload_text.select(fn=on_tab_select, inputs=[input_file, input_text], outputs=[input_file, input_text]) return input_text, input_schema, info_extract_template + +async def timely_update_vid_embedding(): + while True: + try: + await asyncio.to_thread(update_vid_embedding) + log.info("rebuild_vid_index timely executed successfully.") + except asyncio.CancelledError as ce: + log.info("Periodic task has been cancelled due to: %s", ce) + break + except Exception as e: + log.error("Failed to execute rebuild_vid_index: %s", e, exc_info=True) + raise Exception("Failed to execute rebuild_vid_index") from e + await asyncio.sleep(3600) diff --git a/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py b/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py index 7dc69cc..53fccdd 100644 --- a/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py +++ b/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py @@ -14,11 +14,21 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + import json +import os +import shutil +from datetime import datetime -from hugegraph_llm.config import huge_settings +from hugegraph_llm.config import huge_settings, resource_path +from hugegraph_llm.utils.log import log from pyhugegraph.client import PyHugeClient +MAX_BACKUP_DIRS = 7 +MAX_VERTICES = 100000 +MAX_EDGES = 200000 +BACKUP_DIR = str(os.path.join(resource_path, "backup-graph-data-4020", huge_settings.graph_name)) + def run_gremlin_query(query, fmt=True): res = get_hg_client().gremlin().exec(query) @@ -81,3 +91,58 @@ def init_hg_test_data(): def clean_hg_data(): client = get_hg_client() client.graphs().clear_graph_all_data() + + +def create_dir_safely(path): + if not os.path.exists(path): + os.makedirs(path) + + +def backup_data(): + try: + client = get_hg_client() + + create_dir_safely(BACKUP_DIR) + + date_str = datetime.now().strftime("%Y%m%d") + backup_subdir = os.path.join(BACKUP_DIR, f"{date_str}") + create_dir_safely(backup_subdir) + + + files = { + "vertices.json": f"g.V().limit({MAX_VERTICES})" + f".aggregate('vertices').count().as('count').select('count','vertices')", + "edges.json": f"g.E().limit({MAX_EDGES}).aggregate('edges').count().as('count').select('count','edges')", + "schema.json": client.schema().getSchema() + } + + for filename, query in files.items(): + with open(os.path.join(backup_subdir, filename), "w", encoding="utf-8") as f: + data = client.gremlin().exec(query)["data"] if "schema" not in filename else query + json.dump(data, f, ensure_ascii=False) + + log.info("Backup completed successfully in %s.", backup_subdir) + del_info = manage_backup_retention() + return f"Backup completed successfully in {backup_subdir} \n{del_info}" + except Exception as e: # pylint: disable=W0718 + log.critical("Backup failed: %s", e, exc_info=True) + raise Exception("Failed to execute backup") from e + + +def manage_backup_retention(): + try: + backup_dirs = [ + os.path.join(BACKUP_DIR, d) + for d in os.listdir(BACKUP_DIR) + if os.path.isdir(os.path.join(BACKUP_DIR, d)) + ] + backup_dirs.sort(key=os.path.getctime) + if len(backup_dirs) > MAX_BACKUP_DIRS: + old_backup = backup_dirs.pop(0) + shutil.rmtree(old_backup) + log.info("Deleted old backup: %s", old_backup) + return f"Deleted old backup: {old_backup}" + return f"The current number of backup files <= {MAX_BACKUP_DIRS}, so no files are deleted" + except Exception as e: # pylint: disable=W0718 + log.error("Failed to manage backup retention: %s", e, exc_info=True) + raise Exception("Failed to manage backup retention") from e