This is an automated email from the ASF dual-hosted git repository.
freeoneplus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-mcp-server.git
The following commit(s) were added to refs/heads/master by this push:
new fb5e864 [improvement]Add bucket information in the output of
analyze_table_storage (#33)
fb5e864 is described below
commit fb5e864a245b6d736406055864f0a044f656358f
Author: ivin <[email protected]>
AuthorDate: Tue Jul 29 14:04:36 2025 +0800
[improvement]Add bucket information in the output of analyze_table_storage
(#33)
---
doris_mcp_server/utils/data_quality_tools.py | 60 ++++++++++++++++++++++++++--
1 file changed, 57 insertions(+), 3 deletions(-)
diff --git a/doris_mcp_server/utils/data_quality_tools.py
b/doris_mcp_server/utils/data_quality_tools.py
index 9058f4f..3984bc7 100644
--- a/doris_mcp_server/utils/data_quality_tools.py
+++ b/doris_mcp_server/utils/data_quality_tools.py
@@ -25,7 +25,7 @@ import time
import math
import statistics
from datetime import datetime, timedelta
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, cast
from collections import Counter, defaultdict
from .db import DorisConnectionManager
@@ -376,6 +376,54 @@ class DataQualityTools:
except Exception as e:
logger.warning(f"Failed to get table partitions: {str(e)}")
return []
+
+ async def _get_table_bucket_info(self, connection, table_name: str,
db_name: Optional[str] = None) -> Optional[Dict]:
+ """Get table buckets information"""
+ try:
+ # Query bucket information
+ ddl_statement = await self._get_table_ddl(connection, table_name,
db_name)
+ if not ddl_statement:
+ logger.error(f"Could not retrieve DDL for table {table_name}.")
+ return None
+
+ pattern = r"DISTRIBUTED BY (HASH\(([^)]+)\)|RANDOM) BUCKETS
(\d+|AUTO)"
+ matches = re.findall(pattern, cast(str, ddl_statement))
+
+ if matches:
+ dist_type, columns, buckets = matches[0]
+ column_list = [col.strip().strip("`") for col in
columns.split(",")]
+ if dist_type.startswith('HASH'):
+ return {
+ "type": "HASH",
+ "columns": column_list,
+ "bucket_num": buckets,
+ }
+ else:
+ return {
+ "type": "RANDOM",
+ "bucket_num": buckets,
+ }
+ except Exception as e:
+ logger.warning(f"Failed to get table buckets: {str(e)}")
+ return None
+
+ async def _get_table_ddl(
+ self, connection, table_name: str, db_name: Optional[str]
+ ) -> Optional[str]:
+ """Get table DDL statement"""
+ try:
+ query = (
+ f"SHOW CREATE TABLE {db_name}.{table_name}"
+ if db_name
+ else f"SHOW CREATE TABLE {table_name}"
+ )
+ result = await connection.execute(query)
+ if result.data:
+ return result.data[0].get("Create Table")
+ return None
+ except Exception as e:
+ logger.error(f"Error getting DDL for table {table_name}: {e}")
+ return None
async def _get_table_size_info(self, connection, table_name: str) ->
Dict[str, Any]:
"""Get table size information"""
@@ -1007,8 +1055,14 @@ class DataQualityTools:
partition_analysis["balance_score"] = 0.0
else:
partition_analysis["balance_score"] = 1.0 if
len(row_counts) == 1 else 0.0
+
+ # Get bucket information
+ bucket_info = await self._get_table_bucket_info(connection,
table_name, db_name)
- return partition_analysis
+ return {
+ "partition_analysis": partition_analysis,
+ "bucket_analysis": bucket_info
+ }
except Exception as e:
logger.warning(f"Failed to analyze physical distribution:
{str(e)}")
@@ -1057,4 +1111,4 @@ class DataQualityTools:
except Exception as e:
logger.warning(f"Failed to analyze storage info: {str(e)}")
- return {"error": str(e)}
\ No newline at end of file
+ return {"error": str(e)}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]