This is an automated email from the ASF dual-hosted git repository.

gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 5feced5e fix: fix the issue with the AI analysis logs (#618)
5feced5e is described below

commit 5feced5e07c17d3a360f00fd50f1b359e346b2ed
Author: Xue <[email protected]>
AuthorDate: Thu Nov 27 17:01:20 2025 +0800

    fix: fix the issue with the AI analysis logs (#618)
    
    * fix:remove the global update from the downgrade strategy
    
    * fix:modified the AI call for compressing the conversation history
    
    * fix:modified the configuration for log compression and some descriptions 
of the issues
    
    * fix:modify the location of the Redis configuration
    
    * fix:fix version
    
    * fix:fix dependency
    
    * fix:add a description of the source file for opening
    
    ---------
    
    Co-authored-by: xueshan <[email protected]>
    Co-authored-by: wtt <[email protected]>
---
 ozhera-log/log-manager/pom.xml                     |  11 +-
 .../ozhera/log/manager/config/redis/CachePool.java |  28 +
 .../ozhera/log/manager/config/redis/Node.java      |  26 +
 .../ozhera/log/manager/config/redis/NodeImpl.java  |  66 +++
 .../log/manager/config/redis/RedisCachePool.java   |  71 +++
 .../manager/config/redis/RedisCachePoolImpl.java   | 270 ++++++++++
 .../manager/config/redis/RedisClientFactory.java   | 158 ++++++
 .../ozhera/log/manager/model/bo/LogAiMessage.java  |  60 +++
 .../manager/service/bot/ContentSimplifyBot.java    |  86 +++-
 .../log/manager/service/bot/LogAnalysisBot.java    | 123 ++++-
 .../service/impl/MilogAiAnalysisServiceImpl.java   | 564 +++++++++++++++------
 .../nacos/ManagerLevelFilterConfigListener.java    |  37 +-
 .../src/main/resources/config.properties           |   9 +
 .../src/main/resources/config/open.properties      |  10 +
 ozhera-log/pom.xml                                 |  11 +
 15 files changed, 1316 insertions(+), 214 deletions(-)

diff --git a/ozhera-log/log-manager/pom.xml b/ozhera-log/log-manager/pom.xml
index b02a601e..395744dd 100644
--- a/ozhera-log/log-manager/pom.xml
+++ b/ozhera-log/log-manager/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>log-manager</artifactId>
-    <version>2.2.7-SNAPSHOT</version>
+    <version>2.2.8-SNAPSHOT</version>
 
     <properties>
         <maven.compiler.source>21</maven.compiler.source>
@@ -302,6 +302,15 @@ http://www.apache.org/licenses/LICENSE-2.0
             <artifactId>kafka-clients</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.knuddels</groupId>
+            <artifactId>jtokkit</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/CachePool.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/CachePool.java
new file mode 100644
index 00000000..f240c2c0
--- /dev/null
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/CachePool.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.config.redis;
+
+import java.util.List;
+
+public interface CachePool {
+    String getName();
+
+    List<Node> getNodes();
+}
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/Node.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/Node.java
new file mode 100644
index 00000000..1a28f0a6
--- /dev/null
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/Node.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.config.redis;
+
+public interface Node {
+    String getHostname();
+
+    int getPort();
+}
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/NodeImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/NodeImpl.java
new file mode 100644
index 00000000..efe3335d
--- /dev/null
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/NodeImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.ozhera.log.manager.config.redis;
+
+public class NodeImpl implements Node {
+
+
+    private String hostname;
+    private int port;
+
+    public NodeImpl() {
+    }
+
+    public String getHostname() {
+        return this.hostname;
+    }
+
+    public NodeImpl setHostname(String hostname) {
+        this.hostname = hostname;
+        return this;
+    }
+
+    public int getPort() {
+        return this.port;
+    }
+
+    public NodeImpl setPort(int port) {
+        this.port = port;
+        return this;
+    }
+
+    public boolean equals(Object obj) {
+        if (obj != null && obj instanceof Node) {
+            Node anNode = (Node) obj;
+            return this.hostname.equals(anNode.getHostname()) && this.port == 
anNode.getPort();
+        } else {
+            return false;
+        }
+    }
+
+    public String toString() {
+        return this.hostname + ":" + this.port;
+    }
+
+    public int hashCode() {
+        throw new UnsupportedOperationException("hashCode not designed");
+    }
+}
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePool.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePool.java
new file mode 100644
index 00000000..693089d0
--- /dev/null
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePool.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.config.redis;
+
+import org.apache.ozhera.log.cache.util.CachePool;
+
+
+public interface RedisCachePool extends CachePool {
+
+    /**
+     * @return current{@link CachePool}的maxTotal
+     */
+    int getMaxTotal();
+
+    /**
+     * @return current{@link CachePool}的maxIdle
+     */
+    int getMaxIdle();
+
+    /**
+     * @return current{@link CachePool}的minIdle
+     */
+    int getMinIdle();
+
+    /**
+     * @return current{@link CachePool}的maxWaitMillis
+     */
+    long getMaxWaitMillis();
+
+    /**
+     * @return current{@link CachePool}的connectionTimeout
+     */
+    int getConnTimeout();
+
+    /**
+     * @return current{@link CachePool}的socketTimeout
+     */
+    int getSocketTimeout();
+
+    /**
+     * @return current{@link CachePool}的minEvictableIdleTimeMillis
+     */
+    long getMinEvictableIdleTimeMillis();
+
+    /**
+     * @return current{@link CachePool}的maxAttempts
+     */
+    int getMaxAttempts();
+
+    /**
+     * @return current{@link CachePool}的password
+     */
+    String getPassword();
+}
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePoolImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePoolImpl.java
new file mode 100644
index 00000000..7c6ad4d3
--- /dev/null
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePoolImpl.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.config.redis;
+
+import org.apache.ozhera.log.cache.util.CachePool;
+import org.apache.ozhera.log.cache.util.Node;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+public class RedisCachePoolImpl implements RedisCachePool{
+
+    private static Comparator<Node> nodeComparator =
+            new Comparator<Node>() {
+                @Override
+                public int compare(Node n1, Node n2) {
+                    int cmp = n1.getHostname().compareTo(n2.getHostname());
+                    if (cmp != 0) {
+                        return cmp;
+                    }
+                    if (n1.getPort() < n2.getPort()) {
+                        return -1;
+                    } else if (n1.getPort() > n2.getPort()) {
+                        return 1;
+                    } else {
+                        return 0;
+                    }
+                }
+            };
+
+    private String name;
+    private int maxTotal;
+    private int maxIdle;
+    private int minIdle;
+    private int connTimeout;
+    private int socketTimeout;
+    private long maxWaitMillis;
+    private int maxAttempts;
+    private String password;
+    private long minEvictableIdleTimeMillis;
+    private List<Node> nodes = new ArrayList<>();
+
+    public RedisCachePoolImpl() {
+    }
+
+    public RedisCachePoolImpl(
+            String name,
+            int maxTotal,
+            int maxIdle,
+            int minIdle,
+            long maxWaitMillis,
+            int connTimeout,
+            int socketTimeout,
+            long minEvictableIdleTimeMillis,
+            int maxAttempts,
+            String password) {
+        this.name = name;
+        this.maxTotal = maxTotal;
+        this.maxIdle = maxIdle;
+        this.minIdle = minIdle;
+        this.maxWaitMillis = maxWaitMillis;
+        this.connTimeout = connTimeout;
+        this.socketTimeout = socketTimeout;
+        this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
+        this.maxAttempts = maxAttempts;
+        this.password = password;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public List<Node> getNodes() {
+        return nodes;
+    }
+
+    @Override
+    public int getMaxTotal() {
+        return maxTotal;
+    }
+
+    public void setMaxTotal(int maxTotal) {
+        this.maxTotal = maxTotal;
+    }
+
+    @Override
+    public int getMaxIdle() {
+        return maxIdle;
+    }
+
+    public void setMaxIdle(int maxIdle) {
+        this.maxIdle = maxIdle;
+    }
+
+    @Override
+    public int getMinIdle() {
+        return minIdle;
+    }
+
+    public void setMinIdle(int minIdle) {
+        this.minIdle = minIdle;
+    }
+
+    public long getMaxWaitMillis() {
+        return maxWaitMillis;
+    }
+
+    public void setMaxWaitMillis(long maxWaitMillis) {
+        this.maxWaitMillis = maxWaitMillis;
+    }
+
+    @Override
+    public int getConnTimeout() {
+        return connTimeout;
+    }
+
+    public void setConnTimeout(int connTimeout) {
+        this.connTimeout = connTimeout;
+    }
+
+    @Override
+    public int getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    public void setSocketTimeout(int socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    public long getMinEvictableIdleTimeMillis() {
+        return minEvictableIdleTimeMillis;
+    }
+
+    public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) 
{
+        this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
+    }
+
+    @Override
+    public int getMaxAttempts() {
+        return maxAttempts;
+    }
+
+    public void setMaxAttempts(int maxAttempts) {
+        this.maxAttempts = maxAttempts;
+    }
+
+    @Override
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /**
+     * add a node to pool
+     *
+     * @param node
+     */
+    public void addNode(Node node) {
+        if (node == null) {
+            throw new NullPointerException();
+        }
+        for (Node n : nodes) {
+            // hostname和port都一样就不能添加进来
+            // 因为不考虑权重,所以这里不能用equals来判断
+            if (n.getHostname().equals(node.getHostname()) && n.getPort() == 
node.getPort()) {
+                throw new IllegalArgumentException(
+                        "Duplicated Node: " + node.toString() + ", currently 
in List is: " + n.toString());
+            }
+        }
+        nodes.add(node);
+    }
+
+    /**
+     * Compare whether the two groups of nodes are the same. As long as the 
data are the same, it doesn't matter about the order.
+     *
+     * @param nodes1
+     * @param nodes2
+     * @return
+     */
+    private boolean compareNodes(List<Node> nodes1, List<Node> nodes2) {
+        if (nodes1.size() == nodes2.size()) {
+            Node[] type = new Node[]{};
+            Node[] nodea1 = nodes1.toArray(type);
+            Node[] nodea2 = nodes2.toArray(type);
+            Arrays.sort(nodea1, nodeComparator);
+            Arrays.sort(nodea2, nodeComparator);
+            return Arrays.equals(nodea1, nodea2);
+        } else {
+            return false;
+        }
+    }
+
+    public int hashCode() {
+        throw new UnsupportedOperationException("hashCode not designed");
+    }
+
+    public boolean equals(Object anObject) {
+        if (anObject != null && anObject instanceof CachePool) {
+            RedisCachePool anPool = (RedisCachePool) anObject;
+            if (this.name.equals(anPool.getName())) {
+                return this.getMaxTotal() == anPool.getMaxTotal()
+                        && this.getMaxIdle() == anPool.getMaxIdle()
+                        && this.getMinIdle() == anPool.getMinIdle()
+                        && this.getMaxWaitMillis() == anPool.getMaxWaitMillis()
+                        && this.getConnTimeout() == anPool.getConnTimeout()
+                        && this.getSocketTimeout() == anPool.getSocketTimeout()
+                        && this.getMinEvictableIdleTimeMillis() == 
anPool.getMinEvictableIdleTimeMillis()
+                        && this.getMaxAttempts() == anPool.getMaxAttempts()
+                        && this.getPassword().equals(anPool.getPassword())
+                        && compareNodes(this.nodes, anPool.getNodes());
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "RedisCachePoolImpl{"
+                + "name='"
+                + name
+                + '\''
+                + ", maxTotal="
+                + maxTotal
+                + ", maxIdle="
+                + maxIdle
+                + ", minIdle="
+                + minIdle
+                + ", maxWaitMillis="
+                + maxWaitMillis
+                + ", connTimeout="
+                + connTimeout
+                + ", socketTimeout="
+                + socketTimeout
+                + ", minEvictableIdleTimeMillis="
+                + minEvictableIdleTimeMillis
+                + ", maxAttempts="
+                + maxAttempts
+                + ", nodes="
+                + nodes
+                + '}';
+    }
+}
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisClientFactory.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisClientFactory.java
new file mode 100644
index 00000000..2ec9be5d
--- /dev/null
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisClientFactory.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.config.redis;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.ozhera.log.cache.util.Node;
+import org.apache.ozhera.log.cache.util.NodeImpl;
+import org.apache.ozhera.log.common.Config;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPoolConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class RedisClientFactory {
+    private static final ConcurrentHashMap<String, JedisCluster> 
jedisClusterMap =
+            new ConcurrentHashMap<>();
+
+    /**
+     * init when application starts
+     *
+     * <p>if we have more redis pool config, use zk storing these configs and 
change this init method
+     */
+    public static void init() {
+        RedisCachePool redisCachePool = getRedisCachePool();
+        if (jedisClusterMap.containsKey(redisCachePool.getName())) {
+            return;
+        }
+        JedisCluster jedisCluster = createJedisCluster(redisCachePool);
+        jedisClusterMap.put(redisCachePool.getName(), jedisCluster);
+    }
+
+    public static JedisCluster getJedisCluster() {
+        return getJedisCluster(Config.ins().get("redis.pool.name", ""));
+    }
+
+    public static JedisCluster getJedisCluster(String redisPoolName) {
+        JedisCluster jedisCluster = jedisClusterMap.get(redisPoolName);
+        if (jedisCluster == null) {
+//            throw new IllegalArgumentException(String.format("redis pool {} 
not config", redisPoolName));
+        }
+        return jedisCluster;
+    }
+
+    /**
+     * create jedis cluster
+     *
+     * @param redisCachePool
+     * @return
+     */
+    private static JedisCluster createJedisCluster(RedisCachePool 
redisCachePool) {
+        JedisPoolConfig poolConfig = new JedisPoolConfig();
+        poolConfig.setMaxTotal(redisCachePool.getMaxTotal());
+        poolConfig.setMaxIdle(redisCachePool.getMaxIdle());
+        poolConfig.setMinIdle(redisCachePool.getMinIdle());
+        poolConfig.setMaxWaitMillis(redisCachePool.getMaxWaitMillis());
+        
poolConfig.setMinEvictableIdleTimeMillis(redisCachePool.getMinEvictableIdleTimeMillis());
+        poolConfig.setBlockWhenExhausted(true);
+        poolConfig.setTestWhileIdle(false);
+        poolConfig.setTestOnReturn(false);
+        Set<HostAndPort> hostAndPortSet = new HashSet<>();
+        for (Node node : redisCachePool.getNodes()) {
+            hostAndPortSet.add(new HostAndPort(node.getHostname(), 
node.getPort()));
+        }
+
+        JedisCluster jedisCluster;
+        if (StringUtils.isNotEmpty(redisCachePool.getPassword())) {
+            jedisCluster =
+                    new JedisCluster(
+                            hostAndPortSet,
+                            redisCachePool.getConnTimeout(),
+                            redisCachePool.getSocketTimeout(),
+                            redisCachePool.getMaxAttempts(),
+                            redisCachePool.getPassword(),
+                            poolConfig);
+        } else {
+            jedisCluster =
+                    new JedisCluster(
+                            hostAndPortSet,
+                            redisCachePool.getSocketTimeout(),
+                            redisCachePool.getMaxAttempts(),
+                            poolConfig);
+        }
+        log.info("init redis cluster success, poolName:{}", 
redisCachePool.getName());
+        return jedisCluster;
+    }
+
+    private static RedisCachePoolImpl getRedisCachePool() {
+        String poolName = Config.ins().get("redis.pool.name", "");
+        String maxTotalStr = Config.ins().get("redis.pool.max.total", "50");
+        String maxIdleStr = Config.ins().get("redis.pool.max.idle", "30");
+        String minIdleStr = Config.ins().get("redis.pool.min.idle", "10");
+        String maxWaitMillisStr = Config.ins().get("redis.pool.max.wait", 
"1000");
+        String minEvictableIdleTimeMillisStr =
+                Config.ins().get("redis.pool.min.evictable.idle.time", 
"600000");
+        String connTimeoutStr = Config.ins().get("redis.connection.timeout", 
"3000");
+        String socketTimeoutStr = Config.ins().get("redis.socket.timeout", 
"1000");
+        String maxAttemptsStr = Config.ins().get("redis.max.attempts", "2");
+        String addressesStr = Config.ins().get("redis.addresses", "");
+        Validate.notBlank(
+                addressesStr,
+                String.format("redis.addresses cannot be 
blank,redis.pool.name=%s", poolName));
+        String[] addresses = addressesStr.split(",");
+        String password = Config.ins().get("redis.password", "");
+
+        if (StringUtils.isAnyEmpty(poolName, addressesStr, password)) {
+            log.error("Failed to init redis cluster");
+        }
+
+
+        RedisCachePoolImpl pool =
+                new RedisCachePoolImpl(
+                        poolName,
+                        Integer.parseInt(maxTotalStr),
+                        Integer.parseInt(maxIdleStr),
+                        Integer.parseInt(minIdleStr),
+                        Long.parseLong(maxWaitMillisStr),
+                        Integer.parseInt(connTimeoutStr),
+                        Integer.parseInt(socketTimeoutStr),
+                        Long.parseLong(minEvictableIdleTimeMillisStr),
+                        Integer.parseInt(maxAttemptsStr),
+                        password);
+        for (String address : addresses) {
+            pool.addNode(parseNode(address));
+        }
+
+        return pool;
+    }
+
+    private static Node parseNode(String address) {
+        String[] ss1 = address.split(":");
+        String hostname = ss1[0];
+        int port = Integer.parseInt(ss1[1]);
+        return new NodeImpl().setHostname(hostname).setPort(port);
+    }
+}
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/bo/LogAiMessage.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/bo/LogAiMessage.java
new file mode 100644
index 00000000..95b2fe0d
--- /dev/null
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/bo/LogAiMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.model.bo;
+
+import lombok.Data;
+
+@Data
+public class LogAiMessage {
+
+    public enum Role{
+        system,
+        user,
+        assistant,
+    }
+
+    private Role role;
+    private String content;
+
+    public static LogAiMessage user(String content) {
+        LogAiMessage aiMessage = new LogAiMessage();
+        aiMessage.setRole(Role.user);
+        aiMessage.setContent(content);
+        return aiMessage;
+    }
+
+    public static LogAiMessage system(String content) {
+        LogAiMessage aiMessage = new LogAiMessage();
+        aiMessage.setRole(Role.system);
+        aiMessage.setContent(content);
+        return aiMessage;
+    }
+
+    public static LogAiMessage assistant(String content) {
+        LogAiMessage aiMessage = new LogAiMessage();
+        aiMessage.setRole(Role.assistant);
+        aiMessage.setContent(content);
+        return aiMessage;
+    }
+
+    public String getContent(){
+        return this.content;
+    }
+}
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/ContentSimplifyBot.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/ContentSimplifyBot.java
index fb802e0c..b8d12777 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/ContentSimplifyBot.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/ContentSimplifyBot.java
@@ -18,21 +18,27 @@
  */
 package org.apache.ozhera.log.manager.service.bot;
 
-import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import run.mone.hive.Environment;
+import run.mone.hive.llm.CustomConfig;
 import run.mone.hive.llm.LLM;
 import run.mone.hive.llm.LLMProvider;
 import run.mone.hive.roles.Role;
 import run.mone.hive.schema.AiMessage;
 import run.mone.hive.schema.Message;
+import run.mone.hive.schema.MetaKey;
+import run.mone.hive.schema.MetaValue;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 @Service
+@Slf4j
 public class ContentSimplifyBot extends Role {
     private String baseText = """
             你是一个对话摘要压缩助手,接下来我会提供一段对话历史,格式是一个 JSON 列表,列表中的每一项是一个对象,包含三个字段:
@@ -45,10 +51,14 @@ public class ContentSimplifyBot extends Role {
             
             1. **保持输出数据结构与输入一致**:输出仍然是一个 JSON 列表,每一项仍然包含 "time" 、"user" 和 
"bot" 三个字段。
             2. **尽可能减少轮数**:若多轮对话围绕同一问题展开,可以合并为一轮,但必须保留语义完整,并且时间选择为多轮中最后一轮的时间。
-            3. 
**对于一些无关的信息,或者没有什么用的信息直接去除,一定在保留核心关键信息的情况下尽可能的压缩,至少保证压缩后的字符为压缩前的30%往下
+            3. **对于一些无关的信息,或者没有什么用的信息直接去除,一定在保留核心关键信息的情况下尽可能的压缩
             4. **如果每轮的数据中存在原始的日志信息,那么对于日志信息不要进行压缩,需要保持原样
             5. **不得添加任何非对话内容**,例如“压缩后的内容如下”、“总结如下”等。
             6. **输出必须是一个合法的 JSON 列表,结构和字段不变**。
+            7. **尽可能的根据当前内容的token数压缩到指定的目标token数**。
+            
+            【当前内容token数】: {{currentTokenCount}}
+            【内容压缩目标token数】: {{targetTokenCount}}
             
             下面是原始对话历史,请进行压缩(注意格式):
             {{original_text}}
@@ -59,29 +69,85 @@ public class ContentSimplifyBot extends Role {
         setEnvironment(new Environment());
     }
 
+    private static final int MAX_RETRY_COUNT = 3;
+    private static final int INITIAL_RETRY_DELAY_MS = 1000;
+    private static final int MAX_RETRY_DELAY_MS = 5000;
+
     @Override
     public CompletableFuture<Message> run() {
         Message msg = this.rc.getNews().poll();
         String content = msg.getContent();
         String text = baseText.replace("{{original_text}}", content);
+
+        Map<MetaKey, MetaValue> meta = msg.getMeta();
+        MetaKey currentKey = 
MetaKey.builder().key("currentCount").desc("currentCount").build();
+        String current = meta.get(currentKey).getValue().toString();
+
+        MetaKey targetKey = 
MetaKey.builder().key("targetCount").desc("targetCount").build();
+        String target = meta.get(targetKey).getValue().toString();
+
+        text = text.replace("{{currentTokenCount}}", current);
+        text = text.replace("{{targetTokenCount}}", target);
+
         JsonObject req = getReq(llm, text);
         List<AiMessage> messages = new ArrayList<>();
         messages.add(AiMessage.builder().jsonContent(req).build());
-        String result = llm.syncChat(this, messages);
 
+        int retryCount = 0;
+        String result = null;
+        Exception lastException = null;
+
+        while (retryCount <= MAX_RETRY_COUNT) {
+            try {
+                if (retryCount > 0) {
+                    log.info("Attempt the {} th call to LLM", retryCount);
+                    int delayMs = Math.min(INITIAL_RETRY_DELAY_MS * (1 << 
(retryCount - 1)), MAX_RETRY_DELAY_MS);
+                    log.info("Wait {} seconds and try again", delayMs);
+                    Thread.sleep(delayMs);
+                }
+
+                CustomConfig customConfig = new CustomConfig();
+                customConfig.setModel("gpt-5");
+                customConfig.addCustomHeader(CustomConfig.X_MODEL_PROVIDER_ID, 
"azure_openai");
+
+                StringBuilder responseBuilder = new StringBuilder();
+                llm.call(messages, "你是一个ai日志分析的对话压缩助手", 
customConfig).doOnNext(x -> {
+                    responseBuilder.append(x);
+                }).blockLast();
+                result = responseBuilder.toString().trim();
+
+                if (StringUtils.isNotBlank(result)) {
+                    break;
+                }
+            } catch (Exception e) {
+                lastException = e;
+                log.warn("LLM call failed (retry {}/{}): {}", retryCount, 
MAX_RETRY_COUNT, e.getMessage());
+
+                if (e.getMessage() != null && e.getMessage().contains("429")) {
+                    retryCount++;
+                    continue;
+                } else {
+                    log.error("LLM call failed due to a non-429 error. No 
further retries will be made: {}", e.getMessage());
+                    break;
+                }
+            }
+
+            retryCount++;
+        }
+
+        if (result == null) {
+            String errorMsg = lastException != null ? 
lastException.getMessage() : "unknown error";
+            log.error("After {} attempts, the LLM still failed to be called: 
{}", MAX_RETRY_COUNT, errorMsg);
+            result = "";
+        }
         return 
CompletableFuture.completedFuture(Message.builder().content(result).build());
     }
 
     private JsonObject getReq(LLM llm, String text) {
         JsonObject req = new JsonObject();
-        if (llm.getConfig().getLlmProvider() == LLMProvider.CLAUDE_COMPANY) {
+        if (llm.getConfig().getLlmProvider() == LLMProvider.MIFY_GATEWAY) {
             req.addProperty("role", "user");
-            JsonArray contentJsons = new JsonArray();
-            JsonObject obj1 = new JsonObject();
-            obj1.addProperty("type", "text");
-            obj1.addProperty("text", text);
-            contentJsons.add(obj1);
-            req.add("content", contentJsons);
+            req.addProperty("content", text);
         }
         return req;
     }
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/LogAnalysisBot.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/LogAnalysisBot.java
index b578ade3..c92c9913 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/LogAnalysisBot.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/LogAnalysisBot.java
@@ -18,10 +18,16 @@
  */
 package org.apache.ozhera.log.manager.service.bot;
 
-import com.google.gson.JsonArray;
+import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.manager.model.bo.BotQAParam;
+import org.apache.ozhera.log.manager.model.bo.LogAiMessage;
+import org.nutz.log.Log;
 import run.mone.hive.Environment;
+import run.mone.hive.llm.CustomConfig;
 import run.mone.hive.llm.LLM;
 import run.mone.hive.llm.LLMProvider;
 import run.mone.hive.roles.Role;
@@ -33,6 +39,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 @Service
+@Slf4j
 public class LogAnalysisBot extends Role {
     private static String baseText = """
             ## 角色
@@ -56,7 +63,6 @@ public class LogAnalysisBot extends Role {
             - 可扩展性强:随着日志数据量和复杂性的增长,能够持续适应并提供有效分析。
             
             下面是你需要分析的日志内容或用户问题:
-            {{log_text}}
             """;
 
     public LogAnalysisBot() {
@@ -64,31 +70,112 @@ public class LogAnalysisBot extends Role {
         setEnvironment(new Environment());
     }
 
+    private static final int MAX_RETRY_COUNT = 3;
+    private static final int INITIAL_RETRY_DELAY_MS = 1000;
+    private static final int MAX_RETRY_DELAY_MS = 5000;
+
+    private static final Gson GSON = new Gson();
+
     @Override
     public CompletableFuture<Message> run() {
         Message msg = this.rc.getNews().poll();
         String content = msg.getContent();
-        String text = baseText.replace("{{log_text}}", content);
-        JsonObject req = getReq(llm, text);
-        List<AiMessage> messages = new ArrayList<>();
-        messages.add(AiMessage.builder().jsonContent(req).build());
-        String result = llm.syncChat(this, messages);
 
+        List<LogAiMessage> reqList = getReq(llm, content);
+
+        if (reqList == null || reqList.isEmpty()) {
+            return null;
+        }
+        List<AiMessage> messages = reqList.stream().map(m -> {
+            AiMessage aiMessage = new AiMessage();
+            LogAiMessage.Role role = m.getRole();
+            aiMessage.setRole(role.name());
+            aiMessage.setContent(m.getContent());
+            return aiMessage;
+        }).toList();
+
+        int retryCount = 0;
+        String result = null;
+        Exception lastException = null;
+
+        while (retryCount <= MAX_RETRY_COUNT) {
+            try {
+                if (retryCount > 0) {
+                    log.info("Attempt the {} th call to LLM", retryCount);
+                    int delayMs = Math.min(INITIAL_RETRY_DELAY_MS * (1 << 
(retryCount - 1)), MAX_RETRY_DELAY_MS);
+                    log.info("Wait {} seconds and try again", delayMs);
+                    Thread.sleep(delayMs);
+                }
+
+                CustomConfig customConfig = new CustomConfig();
+                customConfig.setModel("gpt-5");
+                customConfig.addCustomHeader(CustomConfig.X_MODEL_PROVIDER_ID, 
"azure_openai");
+
+                StringBuilder responseBuilder = new StringBuilder();
+                llm.call(messages, "你是一个ai日志分析助手", customConfig).doOnNext(x -> 
{
+                    responseBuilder.append(x);
+                }).blockLast();
+                result = responseBuilder.toString().trim();
+
+                if (StringUtils.isNotBlank(result)) {
+                    break;
+                }
+            } catch (Exception e) {
+                lastException = e;
+                log.warn("LLM call failed (retry {}/{}): {}", retryCount, 
MAX_RETRY_COUNT, e.getMessage());
+
+                if (e.getMessage() != null && e.getMessage().contains("429")) {
+                    retryCount++;
+                    continue;
+                } else {
+                    log.error("LLM call failed due to a non-429 error. No 
further retries will be made: {}", e.getMessage());
+                    break;
+                }
+            }
+
+            retryCount++;
+        }
+
+        if (result == null) {
+            String errorMsg = lastException != null ? 
lastException.getMessage() : "unknown error";
+            log.error("After {} attempts, the LLM still failed to be called: 
{}", MAX_RETRY_COUNT, errorMsg);
+            result = "";
+        }
         return 
CompletableFuture.completedFuture(Message.builder().content(result).build());
     }
 
-    private JsonObject getReq(LLM llm, String text) {
-        JsonObject req = new JsonObject();
-        if (llm.getConfig().getLlmProvider() == LLMProvider.CLAUDE_COMPANY) {
-            req.addProperty("role", "user");
-            JsonArray contentJsons = new JsonArray();
-            JsonObject obj1 = new JsonObject();
-            obj1.addProperty("type", "text");
-            obj1.addProperty("text", text);
-            contentJsons.add(obj1);
-            req.add("content", contentJsons);
+    private List<LogAiMessage> getReq(LLM llm, String content) {
+        if(llm.getConfig().getLlmProvider() == LLMProvider.MIFY_GATEWAY){
+            List<LogAiMessage> logAiMessages = initMessageList();
+            BotQAParam botQAParam = GSON.fromJson(content, BotQAParam.class);
+            if (botQAParam.getHistoryConversation() != null && 
!botQAParam.getHistoryConversation().isEmpty()){
+                botQAParam.getHistoryConversation().forEach(history -> {
+                    if (history.getUser() != null && 
!history.getUser().isBlank()){
+                        LogAiMessage userMessage = 
LogAiMessage.user(history.getUser());
+                        logAiMessages.add(userMessage);
+                    }
+                    if (history.getBot() != null && 
!history.getBot().isBlank()){
+                        LogAiMessage assistantMessage = 
LogAiMessage.assistant(history.getBot());
+                        logAiMessages.add(assistantMessage);
+                    }
+
+                });
+            }
+            String latestQuestion = botQAParam.getLatestQuestion();
+            LogAiMessage userMessage = LogAiMessage.user(latestQuestion);
+            logAiMessages.add(userMessage);
+            return logAiMessages;
         }
-        return req;
+        return null;
     }
 
+    private List<LogAiMessage> initMessageList(){
+        List<LogAiMessage> messages = new ArrayList<>();
+        LogAiMessage userMessage = LogAiMessage.user(baseText);
+        messages.add(userMessage);
+        return messages;
+    }
+
+
+
 }
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogAiAnalysisServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogAiAnalysisServiceImpl.java
index 6e1eab8c..07305ca1 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogAiAnalysisServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogAiAnalysisServiceImpl.java
@@ -18,15 +18,20 @@
  */
 package org.apache.ozhera.log.manager.service.impl;
 
-import cn.hutool.core.thread.ThreadUtil;
+
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import com.xiaomi.data.push.common.SafeRun;
+import com.knuddels.jtokkit.Encodings;
+import com.knuddels.jtokkit.api.Encoding;
+import com.knuddels.jtokkit.api.EncodingType;
 import com.xiaomi.youpin.docean.anno.Service;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.ozhera.log.common.Config;
 import org.apache.ozhera.log.common.Result;
 import org.apache.ozhera.log.exception.CommonError;
 import org.apache.ozhera.log.manager.common.context.MoneUserContext;
+import org.apache.ozhera.log.manager.config.redis.RedisClientFactory;
 import org.apache.ozhera.log.manager.mapper.MilogAiConversationMapper;
 import org.apache.ozhera.log.manager.model.bo.BotQAParam;
 import org.apache.ozhera.log.manager.model.dto.AiAnalysisHistoryDTO;
@@ -37,10 +42,14 @@ import 
org.apache.ozhera.log.manager.service.MilogAiAnalysisService;
 import org.apache.ozhera.log.manager.service.bot.ContentSimplifyBot;
 import org.apache.ozhera.log.manager.service.bot.LogAnalysisBot;
 import org.apache.ozhera.log.manager.user.MoneUser;
+import redis.clients.jedis.*;
+import redis.clients.jedis.params.SetParams;
 import run.mone.hive.configs.LLMConfig;
 import run.mone.hive.llm.LLM;
 import run.mone.hive.llm.LLMProvider;
 import run.mone.hive.schema.Message;
+import run.mone.hive.schema.MetaKey;
+import run.mone.hive.schema.MetaValue;
 
 
 import javax.annotation.Resource;
@@ -49,12 +58,9 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
 
-import static run.mone.hive.llm.ClaudeProxy.*;
 
 @Slf4j
 @Service
@@ -69,33 +75,37 @@ public class MilogAiAnalysisServiceImpl implements 
MilogAiAnalysisService {
     @Resource
     private MilogAiConversationMapper milogAiConversationMapper;
 
-    private static final ConcurrentHashMap<Long, Map<String, 
List<BotQAParam.QAParam>>> QA_CACHE = new ConcurrentHashMap<>();
-
-    private static final ConcurrentHashMap<Long, Object> LOCK = new 
ConcurrentHashMap<>();
-
     private static final String MODEL_KEY = "model";
     private static final String ORIGINAL_KEY = "original";
 
-    private static final ConcurrentHashMap<Long, Long> CONVERSATION_TIME = new 
ConcurrentHashMap<>();
-    private static final long CONVERSATION_TIMEOUT = 10 * 60 * 1000;
+    private static final String MILOG_AI_KEY_PREFIX = "milog.ai.conversation:";
+
+    private static final String LOCK_PREFIX = "milog.ai.lock:";
+
+    private static final String GLOBAL_CHECK_LOCK_KEY = 
"milog.ai.checkTokenLength:global";
+
+    private static final String GLOBAL_SHUTDOWN_LOCK_KEY = 
"milog.ai.shutdown:global";
 
     private static final Gson gson = new Gson();
 
-    private ScheduledExecutorService scheduledExecutor;
+    private static final ExecutorService executor = 
Executors.newVirtualThreadPerTaskExecutor();
+
+    private static final JedisCluster jedisCluster = 
RedisClientFactory.getJedisCluster();
+
+    private static final Encoding TOKENIZER = 
Encodings.newDefaultEncodingRegistry().getEncoding(EncodingType.CL100K_BASE);
 
     public void init() {
+        String llmUrl = Config.ins().get("llm.url", "");
+        String llmToken = Config.ins().get("llm.token", "");
         LLMConfig config = LLMConfig.builder()
-                .llmProvider(LLMProvider.CLAUDE_COMPANY)
-                .url(getClaudeUrl())
-                .version(getClaudeVersion())
-                .maxTokens(getClaudeMaxToekns())
+                .url(llmUrl)
+                .token(llmToken)
+                .llmProvider(LLMProvider.MIFY_GATEWAY)
                 .build();
         LLM llm = new LLM(config);
+        llm.setConfigFunction(llmProvider -> Optional.of(config));
         analysisBot.setLlm(llm);
         contentSimplifyBot.setLlm(llm);
-        scheduledExecutor = Executors.newScheduledThreadPool(2, 
ThreadUtil.newNamedThreadFactory("manager-ai-conversation", false));
-        scheduledExecutor.scheduleAtFixedRate(() -> 
SafeRun.run(this::processTask), 0, 2, TimeUnit.MINUTES);
-        scheduledExecutor.scheduleAtFixedRate(() -> 
SafeRun.run(this::checkTokenLength), 0, 1, TimeUnit.MINUTES);
     }
 
 
@@ -117,9 +127,9 @@ public class MilogAiAnalysisServiceImpl implements 
MilogAiAnalysisService {
             String answer = "";
             try {
                 BotQAParam param = new BotQAParam();
-                
param.setLatestQuestion(gson.toJson(tailLogAiAnalysisDTO.getLogs()));
-                String text = formatString(param);
-                
analysisBot.getRc().news.put(Message.builder().content(text).build());
+                
param.setLatestQuestion(formatLogs(tailLogAiAnalysisDTO.getLogs()));
+                String paramJson = gson.toJson(param);
+                
analysisBot.getRc().news.put(Message.builder().content(paramJson).build());
                 Message result = analysisBot.run().join();
                 answer = result.getContent();
             } catch (Exception e) {
@@ -131,7 +141,7 @@ public class MilogAiAnalysisServiceImpl implements 
MilogAiAnalysisService {
             long timestamp = System.currentTimeMillis();
             String nowTimeStr = 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
             conversation.setTime(nowTimeStr);
-            conversation.setUser(gson.toJson(tailLogAiAnalysisDTO.getLogs()));
+            conversation.setUser(formatLogs(tailLogAiAnalysisDTO.getLogs()));
             conversation.setBot(answer);
 
             List<BotQAParam.QAParam> ModelHistory = new ArrayList<>();
@@ -153,48 +163,34 @@ public class MilogAiAnalysisServiceImpl implements 
MilogAiAnalysisService {
             Map<String, List<BotQAParam.QAParam>> cache = new HashMap<>();
             cache.put(MODEL_KEY, ModelHistory);
             cache.put(ORIGINAL_KEY, OriginalHistory);
-            QA_CACHE.put(conversationId, cache);
+            putCache(conversationId, cache);
             response.setConversationId(conversationId);
             response.setContent(answer);
-            CONVERSATION_TIME.put(conversationId, timestamp);
             return Result.success(response);
         } else {
-            String answer = "";
             conversationId = tailLogAiAnalysisDTO.getConversationId();
             //This is not first request, need lock
-            Object lock = LOCK.computeIfAbsent(conversationId, k -> new 
Object());
-            synchronized (lock) {
-                Map<String, List<BotQAParam.QAParam>> cache = 
QA_CACHE.get(conversationId);
-                if (cache == null || cache.isEmpty()) {
-                    cache = getHistoryFromDb(conversationId);
-                }
-                List<BotQAParam.QAParam> modelHistory = cache.get(MODEL_KEY);
-                List<BotQAParam.QAParam> originalHistory = 
cache.get(ORIGINAL_KEY);
-                try {
-                    BotQAParam param = new BotQAParam();
-                    param.setHistoryConversation(modelHistory);
-                    
param.setLatestQuestion(gson.toJson(tailLogAiAnalysisDTO.getLogs()));
-                    String text = formatString(param);
-                    
analysisBot.getRc().news.put(Message.builder().content(gson.toJson(text)).build());
-                    Message result = analysisBot.run().join();
-                    answer = result.getContent();
-
-                } catch (InterruptedException e) {
-                    log.error("An error occurred in the request for the large 
model, err: {}", e.getMessage());
-                    return Result.fail(CommonError.SERVER_ERROR.getCode(), "An 
error occurred in the request for the large model");
-                }
-                BotQAParam.QAParam conversation = new BotQAParam.QAParam();
-                
conversation.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd
 HH:mm:ss")));
-                
conversation.setUser(gson.toJson(tailLogAiAnalysisDTO.getLogs()));
-                conversation.setBot(answer);
+
+            Map<String, List<BotQAParam.QAParam>> cache = 
getConversation(conversationId);
+            List<BotQAParam.QAParam> modelHistory = cache.get(MODEL_KEY);
+            List<BotQAParam.QAParam> originalHistory = cache.get(ORIGINAL_KEY);
+            AnalysisResult analysisResult = 
processHistoryConversation(conversationId, cache, tailLogAiAnalysisDTO);
+            String answer = analysisResult.getAnswer();
+            BotQAParam.QAParam conversation = new BotQAParam.QAParam();
+            
conversation.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd
 HH:mm:ss")));
+            conversation.setUser(formatLogs(tailLogAiAnalysisDTO.getLogs()));
+            conversation.setBot(answer);
+            if (analysisResult.getCompressedModelHistory() != null) {
+                List<BotQAParam.QAParam> compressedModelHistory = 
analysisResult.getCompressedModelHistory();
+                compressedModelHistory.add(conversation);
+                cache.put(MODEL_KEY, compressedModelHistory);
+            } else {
                 modelHistory.add(conversation);
-                originalHistory.add(conversation);
                 cache.put(MODEL_KEY, modelHistory);
-                cache.put(ORIGINAL_KEY, originalHistory);
-                QA_CACHE.put(conversationId, cache);
-                CONVERSATION_TIME.put(conversationId, 
System.currentTimeMillis());
             }
-            LOCK.remove(conversationId);
+            originalHistory.add(conversation);
+            cache.put(ORIGINAL_KEY, originalHistory);
+            putCache(conversationId, cache);
             response.setConversationId(conversationId);
             response.setContent(answer);
             return Result.success(response);
@@ -202,125 +198,237 @@ public class MilogAiAnalysisServiceImpl implements 
MilogAiAnalysisService {
 
     }
 
+    private AnalysisResult processHistoryConversation(Long conversationId, 
Map<String, List<BotQAParam.QAParam>> cache, LogAiAnalysisDTO 
tailLogAiAnalysisDTO) {
+        List<BotQAParam.QAParam> modelHistory = cache.get(MODEL_KEY);
+        List<BotQAParam.QAParam> originalHistory = cache.get(ORIGINAL_KEY);
+        AnalysisResult res = new AnalysisResult();
+        try {
+            BotQAParam param = new BotQAParam();
+            param.setHistoryConversation(modelHistory);
+            
param.setLatestQuestion(formatLogs(tailLogAiAnalysisDTO.getLogs()));
+            String paramJson = gson.toJson(param);
+            if (TOKENIZER.countTokens(paramJson) < 70000) {
+                
analysisBot.getRc().news.put(Message.builder().content(paramJson).build());
+                Message result = analysisBot.run().join();
+                String answer = result.getContent();
+                res.setAnswer(answer);
+                return res;
+            } else {
+                return analysisAndCompression(modelHistory, originalHistory, 
tailLogAiAnalysisDTO.getLogs(), conversationId);
+            }
+        } catch (InterruptedException e) {
+            log.error("An error occurred in the request for the large model, 
err: {}", e.getMessage());
+        }
+        return res;
+    }
 
-    private Map<String, List<BotQAParam.QAParam>> getHistoryFromDb(Long 
conversationId) {
-        MilogAiConversationDO milogAiConversationDO = 
milogAiConversationMapper.selectById(conversationId);
-        String conversationContext = 
milogAiConversationDO.getConversationContext();
+    private AnalysisResult analysisAndCompression(List<BotQAParam.QAParam> 
modelHistory, List<BotQAParam.QAParam> originalHistory, List<String> 
latestConversation, Long conversationId) {
+        AnalysisResult analysisResult = new AnalysisResult();
 
-        if (conversationContext == null || conversationContext.isBlank()) {
-            return Collections.emptyMap();
-        }
+        AtomicReference<String> answer = new AtomicReference<>("");
+        Future<?> analysisFuture = executor.submit(() -> {
+            try {
+                BotQAParam param = new BotQAParam();
+                param.setHistoryConversation(modelHistory);
+                param.setLatestQuestion(gson.toJson(latestConversation));
+                String paramJson = gson.toJson(param);
+                
analysisBot.getRc().news.put(Message.builder().content(paramJson).build());
+                Message result = analysisBot.run().join();
+                answer.set(result.getContent());
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        AtomicReference<List<BotQAParam.QAParam>> newModelHistory = new 
AtomicReference<>(modelHistory);
+        Future<?> compressionFuture = executor.submit(() -> {
+            int index = compressIndex(modelHistory, originalHistory);
+            if (index <= 0) {
+                newModelHistory.set(modelHistory);
+                return;
+            }
+            List<BotQAParam.QAParam> needCompress = new 
ArrayList<>(originalHistory.subList(0, index));
+            List<BotQAParam.QAParam> unchangeList = new 
ArrayList<>(originalHistory.subList(index, originalHistory.size()));
 
-        List<BotQAParam.QAParam> modelConversation = 
gson.fromJson(conversationContext, new TypeToken<List<BotQAParam.QAParam>>() {
-        }.getType());
-        List<BotQAParam.QAParam> originalConversation = 
gson.fromJson(milogAiConversationDO.getOriginalConversation(), new 
TypeToken<List<BotQAParam.QAParam>>() {
-        }.getType());
+            String needCompressJson = gson.toJson(needCompress);
+            //Compress the content that needs to be compressed to have the 
same number of tokens as the content that does not need to be compressed, as 
much as possible.
 
-        Map<String, List<BotQAParam.QAParam>> res = new HashMap<>();
-        res.put(MODEL_KEY, modelConversation);
-        res.put(ORIGINAL_KEY, originalConversation);
-        return res;
-    }
+            int currentTokenCount = TOKENIZER.countTokens(needCompressJson);
+            int targetTokenCount = 
TOKENIZER.countTokens(gson.toJson(unchangeList));
 
+            Map<MetaKey, MetaValue> meta = new HashMap<>();
+            MetaKey currentKey = 
MetaKey.builder().key("currentCount").desc("currentCount").build();
+            MetaValue currentValue = 
MetaValue.builder().value(currentTokenCount).desc("currentCount").build();
+            meta.put(currentKey, currentValue);
 
-    private void processTask() {
-        for (Map.Entry<Long, Long> entry : CONVERSATION_TIME.entrySet()) {
-            Long conversationId = entry.getKey();
-            Object lock = LOCK.computeIfAbsent(conversationId, k -> new 
Object());
-            synchronized (lock) {
-                saveHistory(conversationId);
-                //It has not been operated for a long time
-                if (System.currentTimeMillis() - entry.getValue() > 
CONVERSATION_TIMEOUT) {
-                    CONVERSATION_TIME.remove(entry.getKey());
-                    log.info("clean timeout conversation: {}", entry.getKey());
-                }
+            MetaKey targetKey = 
MetaKey.builder().key("targetCount").desc("targetCount").build();
+            MetaValue targetValue = 
MetaValue.builder().value(targetTokenCount).desc("targetCount").build();
+            meta.put(targetKey, targetValue);
+            String res;
+            try {
+                contentSimplifyBot.getRc().news.put(
+                        
Message.builder().content(needCompressJson).meta(meta).build());
+                Message result = contentSimplifyBot.run().join();
+                res = result.getContent();
+            } catch (Exception e) {
+                log.error("An error occurred when requesting the large model 
to compress data, error: {}", e.getMessage());
+                return;
             }
-            LOCK.remove(conversationId);
+            if (res == null || res.isBlank()) {
+                return;
+            }
+
+            List<BotQAParam.QAParam> compressedList = gson.fromJson(
+                    res,
+                    new TypeToken<List<BotQAParam.QAParam>>() {
+                    }.getType()
+            );
+            if (compressedList == null || compressedList.isEmpty()) {
+                return;
+            }
+            compressedList.addAll(unchangeList);
+            newModelHistory.set(compressedList);
+        });
+        try {
+            analysisFuture.get();
+            compressionFuture.get();
+            String s = answer.get();
+            List<BotQAParam.QAParam> paramList = newModelHistory.get();
+            analysisResult.setAnswer(s);
+            analysisResult.setCompressedModelHistory(paramList);
+        } catch (Exception e) {
+            log.error("analysis and compression of task execution error: {}", 
e.getMessage());
         }
+        return analysisResult;
     }
 
-    private void saveHistory(Long conversationId) {
-        log.info("开始存入数据库, id : {}", conversationId);
-        MilogAiConversationDO milogAiConversationDO = 
milogAiConversationMapper.selectById(conversationId);
-        Map<String, List<BotQAParam.QAParam>> map = 
QA_CACHE.get(conversationId);
-        List<BotQAParam.QAParam> modelHistory = map.get(MODEL_KEY);
-        List<BotQAParam.QAParam> originalHistory = map.get(ORIGINAL_KEY);
-        milogAiConversationDO.setUpdateTime(System.currentTimeMillis());
-        
milogAiConversationDO.setConversationContext(gson.toJson(modelHistory));
-        
milogAiConversationDO.setOriginalConversation(gson.toJson(originalHistory));
-        milogAiConversationMapper.updateById(milogAiConversationDO);
-    }
+    private void checkTokenLength() {
+        if (!trySimpleLock(GLOBAL_CHECK_LOCK_KEY, 50L)) {
+            return;
+        }
 
+        Set<String> allCacheKey = getAllCacheKey();
+        if (allCacheKey.isEmpty()) {
+            return;
+        }
+        for (String key : allCacheKey) {
+            executor.submit(() -> {
+                String[] split = key.split(":");
+                String uuid = UUID.randomUUID().toString();
+                if (split.length == 0) {
+                    return;
+                }
+                Long conversationId;
+                try {
+                    conversationId = Long.valueOf(split[split.length - 1]);
+                } catch (NumberFormatException e) {
+                    log.warn("invalid conversation key: {}", key);
+                    return;
+                }
+                if (!tryLock(conversationId, uuid, 300L)) {
+                    return;
+                }
 
-    private void checkTokenLength() {
-        for (Map.Entry<Long, Map<String, List<BotQAParam.QAParam>>> entry : 
QA_CACHE.entrySet()) {
-            Long conversationId = entry.getKey();
-            Object lock = LOCK.computeIfAbsent(conversationId, k -> new 
Object());
-            synchronized (lock) {
-                List<BotQAParam.QAParam> originalHistory = 
entry.getValue().get(ORIGINAL_KEY);
-                Integer index = compressIndex(entry.getValue());
-                if (index > 0) {
-                    List<BotQAParam.QAParam> needCompress = 
originalHistory.subList(0, index);
-                    List<BotQAParam.QAParam> unchangeList = 
originalHistory.subList(index, originalHistory.size());
-                    String res = "";
+                try {
+                    String value = jedisCluster.get(key);
+                    if (value == null || value.isEmpty()) {
+                        return;
+                    }
+                    Map<String, List<BotQAParam.QAParam>> map = 
gson.fromJson(value, new TypeToken<Map<String, List<BotQAParam.QAParam>>>() {
+                    }.getType());
+                    if (map == null || map.isEmpty()) {
+                        return;
+                    }
+                    int index = compressIndex(map);
+                    if (index <= 0) {
+                        return;
+                    }
+                    List<BotQAParam.QAParam> originalHistory = 
map.get(ORIGINAL_KEY);
+                    if (originalHistory == null || originalHistory.size() <= 
index) {
+                        return;
+                    }
+                    List<BotQAParam.QAParam> needCompress = new 
ArrayList<>(originalHistory.subList(0, index));
+                    List<BotQAParam.QAParam> unchangeList = new 
ArrayList<>(originalHistory.subList(index, originalHistory.size()));
+
+                    String res;
                     try {
-                        
contentSimplifyBot.getRc().news.put(Message.builder().content(gson.toJson(needCompress)).build());
+                        contentSimplifyBot.getRc().news.put(
+                                
Message.builder().content(gson.toJson(needCompress)).build());
                         Message result = contentSimplifyBot.run().join();
                         res = result.getContent();
                     } catch (Exception e) {
-                        log.error("An error occurred when requesting the large 
model to compress data");
+                        log.error("An error occurred when requesting the large 
model to compress data, key: {}, error: {}", key, e.getMessage());
+                        return;
                     }
-                    if (!res.isBlank()) {
-                        List<BotQAParam.QAParam> compressedList = 
gson.fromJson(res, new TypeToken<List<BotQAParam.QAParam>>() {
-                        }.getType());
-                        compressedList.addAll(unchangeList);
-                        entry.getValue().put(MODEL_KEY, compressedList);
-                        QA_CACHE.put(entry.getKey(), entry.getValue());
+                    if (res == null || res.isBlank()) {
+                        return;
                     }
+
+                    List<BotQAParam.QAParam> compressedList = gson.fromJson(
+                            res,
+                            new TypeToken<List<BotQAParam.QAParam>>() {
+                            }.getType()
+                    );
+                    if (compressedList == null || compressedList.isEmpty()) {
+                        return;
+                    }
+                    compressedList.addAll(unchangeList);
+                    map.put(MODEL_KEY, compressedList);
+                    jedisCluster.setex(key, 60 * 60, gson.toJson(map));
+                } catch (Exception e) {
+                    log.error("checkTokenLength error for key: {}, error: {}", 
key, e.getMessage());
+                } finally {
+                    unLock(conversationId, uuid);
                 }
-            }
-            LOCK.remove(conversationId);
-        }
-    }
 
-    private static String formatString(BotQAParam param) {
-        StringBuilder sb = new StringBuilder();
-        List<BotQAParam.QAParam> historyConversation = 
param.getHistoryConversation();
-        if (historyConversation != null && !historyConversation.isEmpty()) {
-            sb.append("历史对话:\n");
-            historyConversation.forEach(h -> {
-                sb.append(String.format("[%s] ###用户: %s  ###助手: %s\n", 
h.getTime(), h.getUser(), h.getBot()));
             });
         }
-        sb.append("最新问题: \n ###用户: ").append(param.getLatestQuestion());
-        return sb.toString()
-                .replaceAll("[\u0000-\u001F]", "")
-                .replaceAll("[\\\\\"]", "");
 
     }
 
-
     private static Boolean requestExceedLimit(List<String> logs) {
-        String request = gson.toJson(logs);
-        if (request.length() >= 20000) {
-            return true;
-        }
-        return false;
+        String formatLog = formatLogs(logs);
+        int count = TOKENIZER.countTokens(formatLog);
+        return count > 20000;
     }
 
     private static Integer compressIndex(Map<String, List<BotQAParam.QAParam>> 
map) {
         List<BotQAParam.QAParam> paramList = map.get(MODEL_KEY);
-        if (gson.toJson(paramList).length() <= 40000) {
+        String modelJson = gson.toJson(paramList);
+        int count = TOKENIZER.countTokens(modelJson);
+        if (count <= 70000) {
             return 0;
         }
-       int limit = 20000;
+        int limit = 20000;
         List<BotQAParam.QAParam> originalList = map.get(ORIGINAL_KEY);
         int sum = 0;
         int index = originalList.size();
         for (int i = originalList.size() - 1; i >= 0; i--) {
             BotQAParam.QAParam param = originalList.get(i);
             String str = gson.toJson(param);
-            sum += str.length();
+            sum += TOKENIZER.countTokens(str);
+            ;
+            index = i;
+            if (sum >= limit) {
+                break;
+            }
+        }
+        int maxCompress = originalList.size() - 20;
+        return Math.max(index, maxCompress);
+    }
+
+    private static Integer compressIndex(List<BotQAParam.QAParam> paramList, 
List<BotQAParam.QAParam> originalList) {
+        String modelJson = gson.toJson(paramList);
+        int count = TOKENIZER.countTokens(modelJson);
+        if (count <= 50000) {
+            return 0;
+        }
+        int limit = 20000;
+        int sum = 0;
+        int index = originalList.size();
+        for (int i = originalList.size() - 1; i >= 0; i--) {
+            BotQAParam.QAParam param = originalList.get(i);
+            String str = gson.toJson(param);
+            sum += TOKENIZER.countTokens(str);
             index = i;
             if (sum >= limit) {
                 break;
@@ -333,16 +441,36 @@ public class MilogAiAnalysisServiceImpl implements 
MilogAiAnalysisService {
 
     @Override
     public void shutdown() {
-        if (!QA_CACHE.isEmpty()){
-            log.info("The project is closed and the cache is flushed to the 
disk");
-            for (Map.Entry<Long, Map<String, List<BotQAParam.QAParam>>> entry 
: QA_CACHE.entrySet()) {
-                MilogAiConversationDO milogAiConversationDO = 
milogAiConversationMapper.selectById(entry.getKey());
-                List<BotQAParam.QAParam> modelHistory = 
entry.getValue().get(MODEL_KEY);
-                List<BotQAParam.QAParam> originalHistory = 
entry.getValue().get(ORIGINAL_KEY);
-                
milogAiConversationDO.setUpdateTime(System.currentTimeMillis());
-                
milogAiConversationDO.setConversationContext(gson.toJson(modelHistory));
-                
milogAiConversationDO.setOriginalConversation(gson.toJson(originalHistory));
-                milogAiConversationMapper.updateById(milogAiConversationDO);
+        if (!trySimpleLock(GLOBAL_SHUTDOWN_LOCK_KEY, 120L)) {
+            return;
+        }
+        Set<String> allCacheKey = getAllCacheKey();
+        if (!allCacheKey.isEmpty()) {
+            List<Future<?>> futures = new ArrayList<>();
+            for (String key : allCacheKey) {
+                Future<?> future = executor.submit(() -> {
+                    String[] split = key.split(":");
+                    Long conversationId = Long.valueOf(split[split.length - 
1]);
+                    String value = jedisCluster.get(key);
+                    Map<String, List<BotQAParam.QAParam>> map = 
gson.fromJson(value, new TypeToken<Map<String, List<BotQAParam.QAParam>>>() {
+                    }.getType());
+                    List<BotQAParam.QAParam> modelHistory = map.get(MODEL_KEY);
+                    List<BotQAParam.QAParam> originalHistory = 
map.get(ORIGINAL_KEY);
+                    MilogAiConversationDO conversationDO = 
milogAiConversationMapper.selectById(conversationId);
+                    if (conversationDO != null) {
+                        
conversationDO.setOriginalConversation(gson.toJson(originalHistory));
+                        
conversationDO.setConversationContext(gson.toJson(modelHistory));
+                        milogAiConversationMapper.updateById(conversationDO);
+                    }
+                });
+                futures.add(future);
+            }
+            for (Future<?> future : futures) {
+                try {
+                    future.get();
+                } catch (Exception e) {
+                    log.error("A future task execute failed: {}", 
e.getMessage());
+                }
             }
         }
     }
@@ -352,7 +480,7 @@ public class MilogAiAnalysisServiceImpl implements 
MilogAiAnalysisService {
         MoneUser user = MoneUserContext.getCurrentUser();
         List<MilogAiConversationDO> historyList = 
milogAiConversationMapper.getListByUserAndStore(storeId, user.getUser());
         List<AiAnalysisHistoryDTO> result = new ArrayList<>();
-        if(!historyList.isEmpty()){
+        if (!historyList.isEmpty()) {
             result = historyList.stream().map(h -> {
                 AiAnalysisHistoryDTO dto = new AiAnalysisHistoryDTO();
                 dto.setId(h.getId());
@@ -366,21 +494,22 @@ public class MilogAiAnalysisServiceImpl implements 
MilogAiAnalysisService {
 
     @Override
     public Result<List<BotQAParam.QAParam>> getAiConversation(Long id) {
-        Map<String, List<BotQAParam.QAParam>> stringListMap = QA_CACHE.get(id);
+        Map<String, List<BotQAParam.QAParam>> stringListMap = getCache(id);
         if (stringListMap != null && !stringListMap.isEmpty()) {
             List<BotQAParam.QAParam> paramList = 
stringListMap.get(ORIGINAL_KEY);
             return Result.success(paramList);
         }
         MilogAiConversationDO conversationDO = 
milogAiConversationMapper.selectById(id);
         String originalConversationStr = 
conversationDO.getOriginalConversation();
-        List<BotQAParam.QAParam> res =  gson.fromJson(originalConversationStr, 
new TypeToken<List<BotQAParam.QAParam>>() {}.getType());
+        List<BotQAParam.QAParam> res = gson.fromJson(originalConversationStr, 
new TypeToken<List<BotQAParam.QAParam>>() {
+        }.getType());
         return Result.success(res);
     }
 
     @Override
     public Result<Boolean> deleteAiConversation(Long id) {
         milogAiConversationMapper.deleteById(id);
-        QA_CACHE.remove(id);
+        removeCache(id);
         return Result.success(true);
     }
 
@@ -395,14 +524,14 @@ public class MilogAiAnalysisServiceImpl implements 
MilogAiAnalysisService {
 
     @Override
     public Result<Boolean> closeAiAnalysis(Long id) {
-        Map<String, List<BotQAParam.QAParam>> stringListMap = QA_CACHE.get(id);
+        Map<String, List<BotQAParam.QAParam>> stringListMap = getCache(id);
         if (stringListMap != null && !stringListMap.isEmpty()) {
             MilogAiConversationDO conversationDO = 
milogAiConversationMapper.selectById(id);
             conversationDO.setUpdateTime(System.currentTimeMillis());
             
conversationDO.setConversationContext(gson.toJson(stringListMap.get(MODEL_KEY)));
             
conversationDO.setOriginalConversation(gson.toJson(stringListMap.get(ORIGINAL_KEY)));
             milogAiConversationMapper.updateById(conversationDO);
-            QA_CACHE.remove(id);
+            removeCache(id);
         }
         return Result.success(true);
     }
@@ -413,4 +542,129 @@ public class MilogAiAnalysisServiceImpl implements 
MilogAiAnalysisService {
         DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSS");
         return dateTime.format(formatter);
     }
+
+    private Map<String, List<BotQAParam.QAParam>> getConversation(Long 
conversationId) {
+        String redisKey = MILOG_AI_KEY_PREFIX + conversationId;
+        String value = jedisCluster.get(redisKey);
+        if (value != null && !value.isEmpty()) {
+            Map<String, List<BotQAParam.QAParam>> map = gson.fromJson(value, 
new TypeToken<Map<String, List<BotQAParam.QAParam>>>() {
+            }.getType());
+            return map;
+        }
+
+        MilogAiConversationDO conversationDO = 
milogAiConversationMapper.selectById(conversationId);
+        if (conversationDO != null) {
+            Map<String, List<BotQAParam.QAParam>> conversationMap = new 
HashMap<>();
+            String conversationContext = 
conversationDO.getConversationContext();
+            List<BotQAParam.QAParam> modelConversation = 
gson.fromJson(conversationContext, new TypeToken<List<BotQAParam.QAParam>>() {
+            }.getType());
+            String originalConversationStr = 
conversationDO.getOriginalConversation();
+            List<BotQAParam.QAParam> originalConversation = 
gson.fromJson(originalConversationStr, new 
TypeToken<List<BotQAParam.QAParam>>() {
+            }.getType());
+
+            conversationMap.put(MODEL_KEY, modelConversation);
+            conversationMap.put(ORIGINAL_KEY, originalConversation);
+
+            putCache(conversationId, conversationMap);
+            return conversationMap;
+        }
+
+        return new HashMap<>();
+    }
+
+    private static boolean putCache(Long conversationId, Map<String, 
List<BotQAParam.QAParam>> map) {
+        if (map == null || map.isEmpty()) {
+            return false;
+        }
+        String redisKey = MILOG_AI_KEY_PREFIX + conversationId;
+        String res = jedisCluster.setex(redisKey, 60 * 60, gson.toJson(map));
+        return true;
+    }
+
+    private Map<String, List<BotQAParam.QAParam>> getCache(Long 
conversationId) {
+        String redisKey = MILOG_AI_KEY_PREFIX + conversationId;
+        String value = jedisCluster.get(redisKey);
+        if (value != null && !value.isEmpty()) {
+            Map<String, List<BotQAParam.QAParam>> map = gson.fromJson(value, 
new TypeToken<Map<String, List<BotQAParam.QAParam>>>() {
+            }.getType());
+            return map;
+        }
+        return new HashMap<>();
+    }
+
+
+    private static void removeCache(Long conversationId) {
+        String redisKey = MILOG_AI_KEY_PREFIX + conversationId;
+        jedisCluster.del(redisKey);
+    }
+
+    private static String formatLogs(List<String> logs) {
+        return String.join("\n", logs);
+    }
+
+    private Set<String> getAllCacheKey() {
+        Set<String> keys = new HashSet<>();
+        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
+        for (Map.Entry<String, JedisPool> entry : clusterNodes.entrySet()) {
+            try (Jedis jedis = entry.getValue().getResource()) {
+                String cursor = "0";
+                do {
+                    ScanResult<String> scanResult = jedis.scan(cursor, new 
ScanParams().match(MILOG_AI_KEY_PREFIX + "*").count(1000));
+                    keys.addAll(scanResult.getResult());
+                    cursor = scanResult.getCursor();
+                } while (!cursor.equals("0"));
+            } catch (Exception e) {
+                log.error("Failed to retrieve all conversation keys from 
Redis!");
+            }
+        }
+        return keys;
+    }
+
+    private boolean tryLock(Long conversationId, String value, Long 
expireSeconds) {
+        String lockKey = LOCK_PREFIX + conversationId;
+        SetParams params = new SetParams();
+        params.nx();
+        params.px(expireSeconds * 1000);
+        String res = jedisCluster.set(lockKey, value, params);
+        return "OK".equals(res);
+    }
+
+    private boolean trySimpleLock(String key, Long expireSeconds) {
+        SetParams params = new SetParams();
+        params.nx();
+        params.px(expireSeconds * 1000);
+        String res = jedisCluster.set(key, "1", params);
+        return "OK".equals(res);
+    }
+
+    private static final String UNLOCK_LUA =
+            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
+                    "   return redis.call('del', KEYS[1]) " +
+                    "else " +
+                    "   return 0 " +
+                    "end";
+
+    private void unLock(Long conversationId, String value) {
+        String lockKey = LOCK_PREFIX + conversationId;
+        try {
+            jedisCluster.eval(UNLOCK_LUA, Collections.singletonList(lockKey), 
Collections.singletonList(value));
+
+        } catch (Exception e) {
+            log.error("failed to unlock key: {}, error:{}", lockKey, 
e.getMessage());
+        }
+    }
+
+    @Data
+    static class AnalysisResult {
+        private String answer;
+        private List<BotQAParam.QAParam> compressedModelHistory;
+    }
+
+    @Data
+    static class CompressionIndex {
+        private Integer index;
+        private Integer currentTokenCount;
+        private Integer targetTokenCount;
+    }
+
 }
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
index 5c869640..04a0209a 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
@@ -22,9 +22,11 @@ import cn.hutool.core.thread.ThreadUtil;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.xiaomi.data.push.common.SafeRun;
 import com.xiaomi.youpin.docean.anno.Component;
+import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
 import com.xiaomi.youpin.docean.plugin.nacos.NacosConfig;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.api.service.PublishConfigService;
 import org.apache.ozhera.log.manager.dao.MilogLogTailDao;
 import org.apache.ozhera.log.manager.dao.MilogLogstoreDao;
 import org.apache.ozhera.log.manager.mapper.MilogLogTemplateMapper;
@@ -68,6 +70,9 @@ public class ManagerLevelFilterConfigListener {
     @Resource
     private MilogLogstoreDao logStoreDao;
 
+    @Reference(interfaceClass = PublishConfigService.class, group = 
"$dubbo.env.group", check = false, timeout = 14000)
+    private PublishConfigService publishConfigService;
+
     private TailExtensionService tailExtensionService;
 
     private final String logLevelFilterKey = "log.level.filter.config.manager";
@@ -136,41 +141,13 @@ public class ManagerLevelFilterConfigListener {
             if (config != null && config.getEnableGlobalFilter() && 
areElementsSameIgnoreCase(newConfig.getLogLevelList(), 
config.getLogLevelList())) {
                 return;
             }
-//            globalUpdateSendMsg();
+            globalUpdateSendMsg();
         }
         config = newConfig;
     }
 
     public void globalUpdateSendMsg() {
-        AtomicLong lastId = new AtomicLong(0L);
-        ConcurrentLinkedQueue<MilogLogTailDo> failedTailList = new 
ConcurrentLinkedQueue<>();
-        List<CompletableFuture<Void>> futureList = new ArrayList<>();
-        try {
-            while (true) {
-                List<MilogLogTailDo> logTailByLastIdList = 
logtailDao.getLogTailByLastId(lastId.get(), BATCH_SIZE);
-                if (logTailByLastIdList.isEmpty()) break;
-                CompletableFuture<Void> future = CompletableFuture.runAsync(() 
-> {
-                    logTailByLastIdList.forEach(tail -> {
-                        try {
-                            updateSingleTail(tail);
-                        } catch (Exception e) {
-                            failedTailList.offer(tail);
-                            log.error("Failed to update tail: {}", 
tail.getId(), e);
-                        }
-                    });
-                }, logUpdateExecutor);
-                futureList.add(future);
-                lastId.set(logTailByLastIdList.get(logTailByLastIdList.size() 
- 1).getId());
-            }
-
-            CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).join();
-
-            if (!failedTailList.isEmpty()) {
-                handleFailedTails(failedTailList);
-            }
-        } catch (Exception e) {
-            log.error("Global log config update failed", e);
-        }
+        log.info("global update,skip!");
     }
 
     private void handleFailedTails(Queue<MilogLogTailDo> failedTailList) {
diff --git a/ozhera-log/log-manager/src/main/resources/config.properties 
b/ozhera-log/log-manager/src/main/resources/config.properties
index a1dafe8a..0eab2587 100644
--- a/ozhera-log/log-manager/src/main/resources/config.properties
+++ b/ozhera-log/log-manager/src/main/resources/config.properties
@@ -76,3 +76,12 @@ tpc.devMode=${tpc.devMode}
 kafka.use.ssl=true
 kafka.sll.location=/opt/app/mix.4096.client.truststore.jks
 
+## redis
+redis.pool.max.total=${redis.pool.max.total}
+redis.pool.max.idle=${redis.pool.max.idle}
+redis.pool.min.idle=${redis.pool.min.idle}
+redis.pool.max.wait=${redis.pool.max.wait}
+redis.pool.min.evictable.idle.time=${redis.pool.min.evictable.idle.time}
+redis.connection.timeout=${redis.connection.timeout}
+redis.socket.timeout=${redis.socket.timeout}
+redis.max.attempts=${redis.max.attempts}
\ No newline at end of file
diff --git a/ozhera-log/log-manager/src/main/resources/config/open.properties 
b/ozhera-log/log-manager/src/main/resources/config/open.properties
index c1827fef..48a3eafe 100644
--- a/ozhera-log/log-manager/src/main/resources/config/open.properties
+++ b/ozhera-log/log-manager/src/main/resources/config/open.properties
@@ -70,3 +70,13 @@ tpc_node_code=logger
 filter_urls=queryAgentK8sIp,queryAgentConfig,matrixLogQuery
 agent.heart.senders=zhangsan
 tpc.devMode=false
+
+## redis
+redis.pool.max.total=50
+redis.pool.max.idle=30
+redis.pool.min.idle=10
+redis.pool.max.wait=1000
+redis.pool.min.evictable.idle.time=600000
+redis.connection.timeout=3000
+redis.socket.timeout=1000
+redis.max.attempts=2
\ No newline at end of file
diff --git a/ozhera-log/pom.xml b/ozhera-log/pom.xml
index 9600e9a5..1f804402 100644
--- a/ozhera-log/pom.xml
+++ b/ozhera-log/pom.xml
@@ -103,6 +103,17 @@ http://www.apache.org/licenses/LICENSE-2.0
                 <artifactId>hutool-all</artifactId>
                 <version>5.8.21</version>
             </dependency>
+
+            <dependency>
+                <groupId>com.knuddels</groupId>
+                <artifactId>jtokkit</artifactId>
+                <version>1.1.0</version>
+            </dependency>
+            <dependency>
+                <groupId>redis.clients</groupId>
+                <artifactId>jedis</artifactId>
+                <version>3.3.0</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to