This is an automated email from the ASF dual-hosted git repository.
gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 5feced5e fix: fix the issue with the AI analysis logs (#618)
5feced5e is described below
commit 5feced5e07c17d3a360f00fd50f1b359e346b2ed
Author: Xue <[email protected]>
AuthorDate: Thu Nov 27 17:01:20 2025 +0800
fix: fix the issue with the AI analysis logs (#618)
* fix:remove the global update from the downgrade strategy
* fix:modified the AI call for compressing the conversation history
* fix:modified the configuration for log compression and some descriptions
of the issues
* fix:modify the location of the Redis configuration
* fix:fix version
* fix:fix dependency
* fix:add a description of the source file for opening
---------
Co-authored-by: xueshan <[email protected]>
Co-authored-by: wtt <[email protected]>
---
ozhera-log/log-manager/pom.xml | 11 +-
.../ozhera/log/manager/config/redis/CachePool.java | 28 +
.../ozhera/log/manager/config/redis/Node.java | 26 +
.../ozhera/log/manager/config/redis/NodeImpl.java | 66 +++
.../log/manager/config/redis/RedisCachePool.java | 71 +++
.../manager/config/redis/RedisCachePoolImpl.java | 270 ++++++++++
.../manager/config/redis/RedisClientFactory.java | 158 ++++++
.../ozhera/log/manager/model/bo/LogAiMessage.java | 60 +++
.../manager/service/bot/ContentSimplifyBot.java | 86 +++-
.../log/manager/service/bot/LogAnalysisBot.java | 123 ++++-
.../service/impl/MilogAiAnalysisServiceImpl.java | 564 +++++++++++++++------
.../nacos/ManagerLevelFilterConfigListener.java | 37 +-
.../src/main/resources/config.properties | 9 +
.../src/main/resources/config/open.properties | 10 +
ozhera-log/pom.xml | 11 +
15 files changed, 1316 insertions(+), 214 deletions(-)
diff --git a/ozhera-log/log-manager/pom.xml b/ozhera-log/log-manager/pom.xml
index b02a601e..395744dd 100644
--- a/ozhera-log/log-manager/pom.xml
+++ b/ozhera-log/log-manager/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<modelVersion>4.0.0</modelVersion>
<artifactId>log-manager</artifactId>
- <version>2.2.7-SNAPSHOT</version>
+ <version>2.2.8-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
@@ -302,6 +302,15 @@ http://www.apache.org/licenses/LICENSE-2.0
<artifactId>kafka-clients</artifactId>
</dependency>
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.knuddels</groupId>
+ <artifactId>jtokkit</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/CachePool.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/CachePool.java
new file mode 100644
index 00000000..f240c2c0
--- /dev/null
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/CachePool.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.config.redis;
+
+import java.util.List;
+
+public interface CachePool {
+ String getName();
+
+ List<Node> getNodes();
+}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/Node.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/Node.java
new file mode 100644
index 00000000..1a28f0a6
--- /dev/null
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/Node.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.config.redis;
+
+public interface Node {
+ String getHostname();
+
+ int getPort();
+}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/NodeImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/NodeImpl.java
new file mode 100644
index 00000000..efe3335d
--- /dev/null
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/NodeImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.ozhera.log.manager.config.redis;
+
+public class NodeImpl implements Node {
+
+
+ private String hostname;
+ private int port;
+
+ public NodeImpl() {
+ }
+
+ public String getHostname() {
+ return this.hostname;
+ }
+
+ public NodeImpl setHostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public NodeImpl setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof Node) {
+ Node anNode = (Node) obj;
+ return this.hostname.equals(anNode.getHostname()) && this.port ==
anNode.getPort();
+ } else {
+ return false;
+ }
+ }
+
+ public String toString() {
+ return this.hostname + ":" + this.port;
+ }
+
+ public int hashCode() {
+ throw new UnsupportedOperationException("hashCode not designed");
+ }
+}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePool.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePool.java
new file mode 100644
index 00000000..693089d0
--- /dev/null
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePool.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.config.redis;
+
+import org.apache.ozhera.log.cache.util.CachePool;
+
+
+public interface RedisCachePool extends CachePool {
+
+ /**
+ * @return current{@link CachePool}的maxTotal
+ */
+ int getMaxTotal();
+
+ /**
+ * @return current{@link CachePool}的maxIdle
+ */
+ int getMaxIdle();
+
+ /**
+ * @return current{@link CachePool}的minIdle
+ */
+ int getMinIdle();
+
+ /**
+ * @return current{@link CachePool}的maxWaitMillis
+ */
+ long getMaxWaitMillis();
+
+ /**
+ * @return current{@link CachePool}的connectionTimeout
+ */
+ int getConnTimeout();
+
+ /**
+ * @return current{@link CachePool}的socketTimeout
+ */
+ int getSocketTimeout();
+
+ /**
+ * @return current{@link CachePool}的minEvictableIdleTimeMillis
+ */
+ long getMinEvictableIdleTimeMillis();
+
+ /**
+ * @return current{@link CachePool}的maxAttempts
+ */
+ int getMaxAttempts();
+
+ /**
+ * @return current{@link CachePool}的password
+ */
+ String getPassword();
+}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePoolImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePoolImpl.java
new file mode 100644
index 00000000..7c6ad4d3
--- /dev/null
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisCachePoolImpl.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.config.redis;
+
+import org.apache.ozhera.log.cache.util.CachePool;
+import org.apache.ozhera.log.cache.util.Node;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+public class RedisCachePoolImpl implements RedisCachePool{
+
+ private static Comparator<Node> nodeComparator =
+ new Comparator<Node>() {
+ @Override
+ public int compare(Node n1, Node n2) {
+ int cmp = n1.getHostname().compareTo(n2.getHostname());
+ if (cmp != 0) {
+ return cmp;
+ }
+ if (n1.getPort() < n2.getPort()) {
+ return -1;
+ } else if (n1.getPort() > n2.getPort()) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ };
+
+ private String name;
+ private int maxTotal;
+ private int maxIdle;
+ private int minIdle;
+ private int connTimeout;
+ private int socketTimeout;
+ private long maxWaitMillis;
+ private int maxAttempts;
+ private String password;
+ private long minEvictableIdleTimeMillis;
+ private List<Node> nodes = new ArrayList<>();
+
+ public RedisCachePoolImpl() {
+ }
+
+ public RedisCachePoolImpl(
+ String name,
+ int maxTotal,
+ int maxIdle,
+ int minIdle,
+ long maxWaitMillis,
+ int connTimeout,
+ int socketTimeout,
+ long minEvictableIdleTimeMillis,
+ int maxAttempts,
+ String password) {
+ this.name = name;
+ this.maxTotal = maxTotal;
+ this.maxIdle = maxIdle;
+ this.minIdle = minIdle;
+ this.maxWaitMillis = maxWaitMillis;
+ this.connTimeout = connTimeout;
+ this.socketTimeout = socketTimeout;
+ this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
+ this.maxAttempts = maxAttempts;
+ this.password = password;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public List<Node> getNodes() {
+ return nodes;
+ }
+
+ @Override
+ public int getMaxTotal() {
+ return maxTotal;
+ }
+
+ public void setMaxTotal(int maxTotal) {
+ this.maxTotal = maxTotal;
+ }
+
+ @Override
+ public int getMaxIdle() {
+ return maxIdle;
+ }
+
+ public void setMaxIdle(int maxIdle) {
+ this.maxIdle = maxIdle;
+ }
+
+ @Override
+ public int getMinIdle() {
+ return minIdle;
+ }
+
+ public void setMinIdle(int minIdle) {
+ this.minIdle = minIdle;
+ }
+
+ public long getMaxWaitMillis() {
+ return maxWaitMillis;
+ }
+
+ public void setMaxWaitMillis(long maxWaitMillis) {
+ this.maxWaitMillis = maxWaitMillis;
+ }
+
+ @Override
+ public int getConnTimeout() {
+ return connTimeout;
+ }
+
+ public void setConnTimeout(int connTimeout) {
+ this.connTimeout = connTimeout;
+ }
+
+ @Override
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ public void setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
+ public long getMinEvictableIdleTimeMillis() {
+ return minEvictableIdleTimeMillis;
+ }
+
+ public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis)
{
+ this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
+ }
+
+ @Override
+ public int getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ public void setMaxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /**
+ * add a node to pool
+ *
+ * @param node
+ */
+ public void addNode(Node node) {
+ if (node == null) {
+ throw new NullPointerException();
+ }
+ for (Node n : nodes) {
+ // hostname和port都一样就不能添加进来
+ // 因为不考虑权重,所以这里不能用equals来判断
+ if (n.getHostname().equals(node.getHostname()) && n.getPort() ==
node.getPort()) {
+ throw new IllegalArgumentException(
+ "Duplicated Node: " + node.toString() + ", currently
in List is: " + n.toString());
+ }
+ }
+ nodes.add(node);
+ }
+
+ /**
+ * Compare whether the two groups of nodes are the same. As long as the
data are the same, it doesn't matter about the order.
+ *
+ * @param nodes1
+ * @param nodes2
+ * @return
+ */
+ private boolean compareNodes(List<Node> nodes1, List<Node> nodes2) {
+ if (nodes1.size() == nodes2.size()) {
+ Node[] type = new Node[]{};
+ Node[] nodea1 = nodes1.toArray(type);
+ Node[] nodea2 = nodes2.toArray(type);
+ Arrays.sort(nodea1, nodeComparator);
+ Arrays.sort(nodea2, nodeComparator);
+ return Arrays.equals(nodea1, nodea2);
+ } else {
+ return false;
+ }
+ }
+
+ public int hashCode() {
+ throw new UnsupportedOperationException("hashCode not designed");
+ }
+
+ public boolean equals(Object anObject) {
+ if (anObject != null && anObject instanceof CachePool) {
+ RedisCachePool anPool = (RedisCachePool) anObject;
+ if (this.name.equals(anPool.getName())) {
+ return this.getMaxTotal() == anPool.getMaxTotal()
+ && this.getMaxIdle() == anPool.getMaxIdle()
+ && this.getMinIdle() == anPool.getMinIdle()
+ && this.getMaxWaitMillis() == anPool.getMaxWaitMillis()
+ && this.getConnTimeout() == anPool.getConnTimeout()
+ && this.getSocketTimeout() == anPool.getSocketTimeout()
+ && this.getMinEvictableIdleTimeMillis() ==
anPool.getMinEvictableIdleTimeMillis()
+ && this.getMaxAttempts() == anPool.getMaxAttempts()
+ && this.getPassword().equals(anPool.getPassword())
+ && compareNodes(this.nodes, anPool.getNodes());
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "RedisCachePoolImpl{"
+ + "name='"
+ + name
+ + '\''
+ + ", maxTotal="
+ + maxTotal
+ + ", maxIdle="
+ + maxIdle
+ + ", minIdle="
+ + minIdle
+ + ", maxWaitMillis="
+ + maxWaitMillis
+ + ", connTimeout="
+ + connTimeout
+ + ", socketTimeout="
+ + socketTimeout
+ + ", minEvictableIdleTimeMillis="
+ + minEvictableIdleTimeMillis
+ + ", maxAttempts="
+ + maxAttempts
+ + ", nodes="
+ + nodes
+ + '}';
+ }
+}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisClientFactory.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisClientFactory.java
new file mode 100644
index 00000000..2ec9be5d
--- /dev/null
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/config/redis/RedisClientFactory.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.config.redis;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.ozhera.log.cache.util.Node;
+import org.apache.ozhera.log.cache.util.NodeImpl;
+import org.apache.ozhera.log.common.Config;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPoolConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class RedisClientFactory {
+ private static final ConcurrentHashMap<String, JedisCluster>
jedisClusterMap =
+ new ConcurrentHashMap<>();
+
+ /**
+ * init when application starts
+ *
+ * <p>if we have more redis pool config, use zk storing these configs and
change this init method
+ */
+ public static void init() {
+ RedisCachePool redisCachePool = getRedisCachePool();
+ if (jedisClusterMap.containsKey(redisCachePool.getName())) {
+ return;
+ }
+ JedisCluster jedisCluster = createJedisCluster(redisCachePool);
+ jedisClusterMap.put(redisCachePool.getName(), jedisCluster);
+ }
+
+ public static JedisCluster getJedisCluster() {
+ return getJedisCluster(Config.ins().get("redis.pool.name", ""));
+ }
+
+ public static JedisCluster getJedisCluster(String redisPoolName) {
+ JedisCluster jedisCluster = jedisClusterMap.get(redisPoolName);
+ if (jedisCluster == null) {
+// throw new IllegalArgumentException(String.format("redis pool {}
not config", redisPoolName));
+ }
+ return jedisCluster;
+ }
+
+ /**
+ * create jedis cluster
+ *
+ * @param redisCachePool
+ * @return
+ */
+ private static JedisCluster createJedisCluster(RedisCachePool
redisCachePool) {
+ JedisPoolConfig poolConfig = new JedisPoolConfig();
+ poolConfig.setMaxTotal(redisCachePool.getMaxTotal());
+ poolConfig.setMaxIdle(redisCachePool.getMaxIdle());
+ poolConfig.setMinIdle(redisCachePool.getMinIdle());
+ poolConfig.setMaxWaitMillis(redisCachePool.getMaxWaitMillis());
+
poolConfig.setMinEvictableIdleTimeMillis(redisCachePool.getMinEvictableIdleTimeMillis());
+ poolConfig.setBlockWhenExhausted(true);
+ poolConfig.setTestWhileIdle(false);
+ poolConfig.setTestOnReturn(false);
+ Set<HostAndPort> hostAndPortSet = new HashSet<>();
+ for (Node node : redisCachePool.getNodes()) {
+ hostAndPortSet.add(new HostAndPort(node.getHostname(),
node.getPort()));
+ }
+
+ JedisCluster jedisCluster;
+ if (StringUtils.isNotEmpty(redisCachePool.getPassword())) {
+ jedisCluster =
+ new JedisCluster(
+ hostAndPortSet,
+ redisCachePool.getConnTimeout(),
+ redisCachePool.getSocketTimeout(),
+ redisCachePool.getMaxAttempts(),
+ redisCachePool.getPassword(),
+ poolConfig);
+ } else {
+ jedisCluster =
+ new JedisCluster(
+ hostAndPortSet,
+ redisCachePool.getSocketTimeout(),
+ redisCachePool.getMaxAttempts(),
+ poolConfig);
+ }
+ log.info("init redis cluster success, poolName:{}",
redisCachePool.getName());
+ return jedisCluster;
+ }
+
+ private static RedisCachePoolImpl getRedisCachePool() {
+ String poolName = Config.ins().get("redis.pool.name", "");
+ String maxTotalStr = Config.ins().get("redis.pool.max.total", "50");
+ String maxIdleStr = Config.ins().get("redis.pool.max.idle", "30");
+ String minIdleStr = Config.ins().get("redis.pool.min.idle", "10");
+ String maxWaitMillisStr = Config.ins().get("redis.pool.max.wait",
"1000");
+ String minEvictableIdleTimeMillisStr =
+ Config.ins().get("redis.pool.min.evictable.idle.time",
"600000");
+ String connTimeoutStr = Config.ins().get("redis.connection.timeout",
"3000");
+ String socketTimeoutStr = Config.ins().get("redis.socket.timeout",
"1000");
+ String maxAttemptsStr = Config.ins().get("redis.max.attempts", "2");
+ String addressesStr = Config.ins().get("redis.addresses", "");
+ Validate.notBlank(
+ addressesStr,
+ String.format("redis.addresses cannot be
blank,redis.pool.name=%s", poolName));
+ String[] addresses = addressesStr.split(",");
+ String password = Config.ins().get("redis.password", "");
+
+ if (StringUtils.isAnyEmpty(poolName, addressesStr, password)) {
+ log.error("Failed to init redis cluster");
+ }
+
+
+ RedisCachePoolImpl pool =
+ new RedisCachePoolImpl(
+ poolName,
+ Integer.parseInt(maxTotalStr),
+ Integer.parseInt(maxIdleStr),
+ Integer.parseInt(minIdleStr),
+ Long.parseLong(maxWaitMillisStr),
+ Integer.parseInt(connTimeoutStr),
+ Integer.parseInt(socketTimeoutStr),
+ Long.parseLong(minEvictableIdleTimeMillisStr),
+ Integer.parseInt(maxAttemptsStr),
+ password);
+ for (String address : addresses) {
+ pool.addNode(parseNode(address));
+ }
+
+ return pool;
+ }
+
+ private static Node parseNode(String address) {
+ String[] ss1 = address.split(":");
+ String hostname = ss1[0];
+ int port = Integer.parseInt(ss1[1]);
+ return new NodeImpl().setHostname(hostname).setPort(port);
+ }
+}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/bo/LogAiMessage.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/bo/LogAiMessage.java
new file mode 100644
index 00000000..95b2fe0d
--- /dev/null
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/bo/LogAiMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ozhera.log.manager.model.bo;
+
+import lombok.Data;
+
+@Data
+public class LogAiMessage {
+
+ public enum Role{
+ system,
+ user,
+ assistant,
+ }
+
+ private Role role;
+ private String content;
+
+ public static LogAiMessage user(String content) {
+ LogAiMessage aiMessage = new LogAiMessage();
+ aiMessage.setRole(Role.user);
+ aiMessage.setContent(content);
+ return aiMessage;
+ }
+
+ public static LogAiMessage system(String content) {
+ LogAiMessage aiMessage = new LogAiMessage();
+ aiMessage.setRole(Role.system);
+ aiMessage.setContent(content);
+ return aiMessage;
+ }
+
+ public static LogAiMessage assistant(String content) {
+ LogAiMessage aiMessage = new LogAiMessage();
+ aiMessage.setRole(Role.assistant);
+ aiMessage.setContent(content);
+ return aiMessage;
+ }
+
+ public String getContent(){
+ return this.content;
+ }
+}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/ContentSimplifyBot.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/ContentSimplifyBot.java
index fb802e0c..b8d12777 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/ContentSimplifyBot.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/ContentSimplifyBot.java
@@ -18,21 +18,27 @@
*/
package org.apache.ozhera.log.manager.service.bot;
-import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import run.mone.hive.Environment;
+import run.mone.hive.llm.CustomConfig;
import run.mone.hive.llm.LLM;
import run.mone.hive.llm.LLMProvider;
import run.mone.hive.roles.Role;
import run.mone.hive.schema.AiMessage;
import run.mone.hive.schema.Message;
+import run.mone.hive.schema.MetaKey;
+import run.mone.hive.schema.MetaValue;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
@Service
+@Slf4j
public class ContentSimplifyBot extends Role {
private String baseText = """
你是一个对话摘要压缩助手,接下来我会提供一段对话历史,格式是一个 JSON 列表,列表中的每一项是一个对象,包含三个字段:
@@ -45,10 +51,14 @@ public class ContentSimplifyBot extends Role {
1. **保持输出数据结构与输入一致**:输出仍然是一个 JSON 列表,每一项仍然包含 "time" 、"user" 和
"bot" 三个字段。
2. **尽可能减少轮数**:若多轮对话围绕同一问题展开,可以合并为一轮,但必须保留语义完整,并且时间选择为多轮中最后一轮的时间。
- 3.
**对于一些无关的信息,或者没有什么用的信息直接去除,一定在保留核心关键信息的情况下尽可能的压缩,至少保证压缩后的字符为压缩前的30%往下
+ 3. **对于一些无关的信息,或者没有什么用的信息直接去除,一定在保留核心关键信息的情况下尽可能的压缩
4. **如果每轮的数据中存在原始的日志信息,那么对于日志信息不要进行压缩,需要保持原样
5. **不得添加任何非对话内容**,例如“压缩后的内容如下”、“总结如下”等。
6. **输出必须是一个合法的 JSON 列表,结构和字段不变**。
+ 7. **尽可能的根据当前内容的token数压缩到指定的目标token数**。
+
+ 【当前内容token数】: {{currentTokenCount}}
+ 【内容压缩目标token数】: {{targetTokenCount}}
下面是原始对话历史,请进行压缩(注意格式):
{{original_text}}
@@ -59,29 +69,85 @@ public class ContentSimplifyBot extends Role {
setEnvironment(new Environment());
}
+ private static final int MAX_RETRY_COUNT = 3;
+ private static final int INITIAL_RETRY_DELAY_MS = 1000;
+ private static final int MAX_RETRY_DELAY_MS = 5000;
+
@Override
public CompletableFuture<Message> run() {
Message msg = this.rc.getNews().poll();
String content = msg.getContent();
String text = baseText.replace("{{original_text}}", content);
+
+ Map<MetaKey, MetaValue> meta = msg.getMeta();
+ MetaKey currentKey =
MetaKey.builder().key("currentCount").desc("currentCount").build();
+ String current = meta.get(currentKey).getValue().toString();
+
+ MetaKey targetKey =
MetaKey.builder().key("targetCount").desc("targetCount").build();
+ String target = meta.get(targetKey).getValue().toString();
+
+ text = text.replace("{{currentTokenCount}}", current);
+ text = text.replace("{{targetTokenCount}}", target);
+
JsonObject req = getReq(llm, text);
List<AiMessage> messages = new ArrayList<>();
messages.add(AiMessage.builder().jsonContent(req).build());
- String result = llm.syncChat(this, messages);
+ int retryCount = 0;
+ String result = null;
+ Exception lastException = null;
+
+ while (retryCount <= MAX_RETRY_COUNT) {
+ try {
+ if (retryCount > 0) {
+ log.info("Attempt the {} th call to LLM", retryCount);
+ int delayMs = Math.min(INITIAL_RETRY_DELAY_MS * (1 <<
(retryCount - 1)), MAX_RETRY_DELAY_MS);
+ log.info("Wait {} seconds and try again", delayMs);
+ Thread.sleep(delayMs);
+ }
+
+ CustomConfig customConfig = new CustomConfig();
+ customConfig.setModel("gpt-5");
+ customConfig.addCustomHeader(CustomConfig.X_MODEL_PROVIDER_ID,
"azure_openai");
+
+ StringBuilder responseBuilder = new StringBuilder();
+ llm.call(messages, "你是一个ai日志分析的对话压缩助手",
customConfig).doOnNext(x -> {
+ responseBuilder.append(x);
+ }).blockLast();
+ result = responseBuilder.toString().trim();
+
+ if (StringUtils.isNotBlank(result)) {
+ break;
+ }
+ } catch (Exception e) {
+ lastException = e;
+ log.warn("LLM call failed (retry {}/{}): {}", retryCount,
MAX_RETRY_COUNT, e.getMessage());
+
+ if (e.getMessage() != null && e.getMessage().contains("429")) {
+ retryCount++;
+ continue;
+ } else {
+ log.error("LLM call failed due to a non-429 error. No
further retries will be made: {}", e.getMessage());
+ break;
+ }
+ }
+
+ retryCount++;
+ }
+
+ if (result == null) {
+ String errorMsg = lastException != null ?
lastException.getMessage() : "unknown error";
+ log.error("After {} attempts, the LLM still failed to be called:
{}", MAX_RETRY_COUNT, errorMsg);
+ result = "";
+ }
return
CompletableFuture.completedFuture(Message.builder().content(result).build());
}
private JsonObject getReq(LLM llm, String text) {
JsonObject req = new JsonObject();
- if (llm.getConfig().getLlmProvider() == LLMProvider.CLAUDE_COMPANY) {
+ if (llm.getConfig().getLlmProvider() == LLMProvider.MIFY_GATEWAY) {
req.addProperty("role", "user");
- JsonArray contentJsons = new JsonArray();
- JsonObject obj1 = new JsonObject();
- obj1.addProperty("type", "text");
- obj1.addProperty("text", text);
- contentJsons.add(obj1);
- req.add("content", contentJsons);
+ req.addProperty("content", text);
}
return req;
}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/LogAnalysisBot.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/LogAnalysisBot.java
index b578ade3..c92c9913 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/LogAnalysisBot.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/bot/LogAnalysisBot.java
@@ -18,10 +18,16 @@
*/
package org.apache.ozhera.log.manager.service.bot;
-import com.google.gson.JsonArray;
+import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.manager.model.bo.BotQAParam;
+import org.apache.ozhera.log.manager.model.bo.LogAiMessage;
+import org.nutz.log.Log;
import run.mone.hive.Environment;
+import run.mone.hive.llm.CustomConfig;
import run.mone.hive.llm.LLM;
import run.mone.hive.llm.LLMProvider;
import run.mone.hive.roles.Role;
@@ -33,6 +39,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
@Service
+@Slf4j
public class LogAnalysisBot extends Role {
private static String baseText = """
## 角色
@@ -56,7 +63,6 @@ public class LogAnalysisBot extends Role {
- 可扩展性强:随着日志数据量和复杂性的增长,能够持续适应并提供有效分析。
下面是你需要分析的日志内容或用户问题:
- {{log_text}}
""";
public LogAnalysisBot() {
@@ -64,31 +70,112 @@ public class LogAnalysisBot extends Role {
setEnvironment(new Environment());
}
+ private static final int MAX_RETRY_COUNT = 3;
+ private static final int INITIAL_RETRY_DELAY_MS = 1000;
+ private static final int MAX_RETRY_DELAY_MS = 5000;
+
+ private static final Gson GSON = new Gson();
+
@Override
public CompletableFuture<Message> run() {
Message msg = this.rc.getNews().poll();
String content = msg.getContent();
- String text = baseText.replace("{{log_text}}", content);
- JsonObject req = getReq(llm, text);
- List<AiMessage> messages = new ArrayList<>();
- messages.add(AiMessage.builder().jsonContent(req).build());
- String result = llm.syncChat(this, messages);
+ List<LogAiMessage> reqList = getReq(llm, content);
+
+ if (reqList == null || reqList.isEmpty()) {
+ return null;
+ }
+ List<AiMessage> messages = reqList.stream().map(m -> {
+ AiMessage aiMessage = new AiMessage();
+ LogAiMessage.Role role = m.getRole();
+ aiMessage.setRole(role.name());
+ aiMessage.setContent(m.getContent());
+ return aiMessage;
+ }).toList();
+
+ int retryCount = 0;
+ String result = null;
+ Exception lastException = null;
+
+ while (retryCount <= MAX_RETRY_COUNT) {
+ try {
+ if (retryCount > 0) {
+ log.info("Attempt the {} th call to LLM", retryCount);
+ int delayMs = Math.min(INITIAL_RETRY_DELAY_MS * (1 <<
(retryCount - 1)), MAX_RETRY_DELAY_MS);
+ log.info("Wait {} seconds and try again", delayMs);
+ Thread.sleep(delayMs);
+ }
+
+ CustomConfig customConfig = new CustomConfig();
+ customConfig.setModel("gpt-5");
+ customConfig.addCustomHeader(CustomConfig.X_MODEL_PROVIDER_ID,
"azure_openai");
+
+ StringBuilder responseBuilder = new StringBuilder();
+ llm.call(messages, "你是一个ai日志分析助手", customConfig).doOnNext(x ->
{
+ responseBuilder.append(x);
+ }).blockLast();
+ result = responseBuilder.toString().trim();
+
+ if (StringUtils.isNotBlank(result)) {
+ break;
+ }
+ } catch (Exception e) {
+ lastException = e;
+ log.warn("LLM call failed (retry {}/{}): {}", retryCount,
MAX_RETRY_COUNT, e.getMessage());
+
+ if (e.getMessage() != null && e.getMessage().contains("429")) {
+ retryCount++;
+ continue;
+ } else {
+ log.error("LLM call failed due to a non-429 error. No
further retries will be made: {}", e.getMessage());
+ break;
+ }
+ }
+
+ retryCount++;
+ }
+
+ if (result == null) {
+ String errorMsg = lastException != null ?
lastException.getMessage() : "unknown error";
+ log.error("After {} attempts, the LLM still failed to be called:
{}", MAX_RETRY_COUNT, errorMsg);
+ result = "";
+ }
return
CompletableFuture.completedFuture(Message.builder().content(result).build());
}
- private JsonObject getReq(LLM llm, String text) {
- JsonObject req = new JsonObject();
- if (llm.getConfig().getLlmProvider() == LLMProvider.CLAUDE_COMPANY) {
- req.addProperty("role", "user");
- JsonArray contentJsons = new JsonArray();
- JsonObject obj1 = new JsonObject();
- obj1.addProperty("type", "text");
- obj1.addProperty("text", text);
- contentJsons.add(obj1);
- req.add("content", contentJsons);
+ private List<LogAiMessage> getReq(LLM llm, String content) {
+ if(llm.getConfig().getLlmProvider() == LLMProvider.MIFY_GATEWAY){
+ List<LogAiMessage> logAiMessages = initMessageList();
+ BotQAParam botQAParam = GSON.fromJson(content, BotQAParam.class);
+ if (botQAParam.getHistoryConversation() != null &&
!botQAParam.getHistoryConversation().isEmpty()){
+ botQAParam.getHistoryConversation().forEach(history -> {
+ if (history.getUser() != null &&
!history.getUser().isBlank()){
+ LogAiMessage userMessage =
LogAiMessage.user(history.getUser());
+ logAiMessages.add(userMessage);
+ }
+ if (history.getBot() != null &&
!history.getBot().isBlank()){
+ LogAiMessage assistantMessage =
LogAiMessage.assistant(history.getBot());
+ logAiMessages.add(assistantMessage);
+ }
+
+ });
+ }
+ String latestQuestion = botQAParam.getLatestQuestion();
+ LogAiMessage userMessage = LogAiMessage.user(latestQuestion);
+ logAiMessages.add(userMessage);
+ return logAiMessages;
}
- return req;
+ return null;
}
+ private List<LogAiMessage> initMessageList(){
+ List<LogAiMessage> messages = new ArrayList<>();
+ LogAiMessage userMessage = LogAiMessage.user(baseText);
+ messages.add(userMessage);
+ return messages;
+ }
+
+
+
}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogAiAnalysisServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogAiAnalysisServiceImpl.java
index 6e1eab8c..07305ca1 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogAiAnalysisServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogAiAnalysisServiceImpl.java
@@ -18,15 +18,20 @@
*/
package org.apache.ozhera.log.manager.service.impl;
-import cn.hutool.core.thread.ThreadUtil;
+
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-import com.xiaomi.data.push.common.SafeRun;
+import com.knuddels.jtokkit.Encodings;
+import com.knuddels.jtokkit.api.Encoding;
+import com.knuddels.jtokkit.api.EncodingType;
import com.xiaomi.youpin.docean.anno.Service;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.ozhera.log.common.Config;
import org.apache.ozhera.log.common.Result;
import org.apache.ozhera.log.exception.CommonError;
import org.apache.ozhera.log.manager.common.context.MoneUserContext;
+import org.apache.ozhera.log.manager.config.redis.RedisClientFactory;
import org.apache.ozhera.log.manager.mapper.MilogAiConversationMapper;
import org.apache.ozhera.log.manager.model.bo.BotQAParam;
import org.apache.ozhera.log.manager.model.dto.AiAnalysisHistoryDTO;
@@ -37,10 +42,14 @@ import
org.apache.ozhera.log.manager.service.MilogAiAnalysisService;
import org.apache.ozhera.log.manager.service.bot.ContentSimplifyBot;
import org.apache.ozhera.log.manager.service.bot.LogAnalysisBot;
import org.apache.ozhera.log.manager.user.MoneUser;
+import redis.clients.jedis.*;
+import redis.clients.jedis.params.SetParams;
import run.mone.hive.configs.LLMConfig;
import run.mone.hive.llm.LLM;
import run.mone.hive.llm.LLMProvider;
import run.mone.hive.schema.Message;
+import run.mone.hive.schema.MetaKey;
+import run.mone.hive.schema.MetaValue;
import javax.annotation.Resource;
@@ -49,12 +58,9 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
-import static run.mone.hive.llm.ClaudeProxy.*;
@Slf4j
@Service
@@ -69,33 +75,37 @@ public class MilogAiAnalysisServiceImpl implements
MilogAiAnalysisService {
@Resource
private MilogAiConversationMapper milogAiConversationMapper;
- private static final ConcurrentHashMap<Long, Map<String,
List<BotQAParam.QAParam>>> QA_CACHE = new ConcurrentHashMap<>();
-
- private static final ConcurrentHashMap<Long, Object> LOCK = new
ConcurrentHashMap<>();
-
private static final String MODEL_KEY = "model";
private static final String ORIGINAL_KEY = "original";
- private static final ConcurrentHashMap<Long, Long> CONVERSATION_TIME = new
ConcurrentHashMap<>();
- private static final long CONVERSATION_TIMEOUT = 10 * 60 * 1000;
+ private static final String MILOG_AI_KEY_PREFIX = "milog.ai.conversation:";
+
+ private static final String LOCK_PREFIX = "milog.ai.lock:";
+
+ private static final String GLOBAL_CHECK_LOCK_KEY =
"milog.ai.checkTokenLength:global";
+
+ private static final String GLOBAL_SHUTDOWN_LOCK_KEY =
"milog.ai.shutdown:global";
private static final Gson gson = new Gson();
- private ScheduledExecutorService scheduledExecutor;
+ private static final ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor();
+
+ private static final JedisCluster jedisCluster =
RedisClientFactory.getJedisCluster();
+
+ private static final Encoding TOKENIZER =
Encodings.newDefaultEncodingRegistry().getEncoding(EncodingType.CL100K_BASE);
public void init() {
+ String llmUrl = Config.ins().get("llm.url", "");
+ String llmToken = Config.ins().get("llm.token", "");
LLMConfig config = LLMConfig.builder()
- .llmProvider(LLMProvider.CLAUDE_COMPANY)
- .url(getClaudeUrl())
- .version(getClaudeVersion())
- .maxTokens(getClaudeMaxToekns())
+ .url(llmUrl)
+ .token(llmToken)
+ .llmProvider(LLMProvider.MIFY_GATEWAY)
.build();
LLM llm = new LLM(config);
+ llm.setConfigFunction(llmProvider -> Optional.of(config));
analysisBot.setLlm(llm);
contentSimplifyBot.setLlm(llm);
- scheduledExecutor = Executors.newScheduledThreadPool(2,
ThreadUtil.newNamedThreadFactory("manager-ai-conversation", false));
- scheduledExecutor.scheduleAtFixedRate(() ->
SafeRun.run(this::processTask), 0, 2, TimeUnit.MINUTES);
- scheduledExecutor.scheduleAtFixedRate(() ->
SafeRun.run(this::checkTokenLength), 0, 1, TimeUnit.MINUTES);
}
@@ -117,9 +127,9 @@ public class MilogAiAnalysisServiceImpl implements
MilogAiAnalysisService {
String answer = "";
try {
BotQAParam param = new BotQAParam();
-
param.setLatestQuestion(gson.toJson(tailLogAiAnalysisDTO.getLogs()));
- String text = formatString(param);
-
analysisBot.getRc().news.put(Message.builder().content(text).build());
+
param.setLatestQuestion(formatLogs(tailLogAiAnalysisDTO.getLogs()));
+ String paramJson = gson.toJson(param);
+
analysisBot.getRc().news.put(Message.builder().content(paramJson).build());
Message result = analysisBot.run().join();
answer = result.getContent();
} catch (Exception e) {
@@ -131,7 +141,7 @@ public class MilogAiAnalysisServiceImpl implements
MilogAiAnalysisService {
long timestamp = System.currentTimeMillis();
String nowTimeStr =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
conversation.setTime(nowTimeStr);
- conversation.setUser(gson.toJson(tailLogAiAnalysisDTO.getLogs()));
+ conversation.setUser(formatLogs(tailLogAiAnalysisDTO.getLogs()));
conversation.setBot(answer);
List<BotQAParam.QAParam> ModelHistory = new ArrayList<>();
@@ -153,48 +163,34 @@ public class MilogAiAnalysisServiceImpl implements
MilogAiAnalysisService {
Map<String, List<BotQAParam.QAParam>> cache = new HashMap<>();
cache.put(MODEL_KEY, ModelHistory);
cache.put(ORIGINAL_KEY, OriginalHistory);
- QA_CACHE.put(conversationId, cache);
+ putCache(conversationId, cache);
response.setConversationId(conversationId);
response.setContent(answer);
- CONVERSATION_TIME.put(conversationId, timestamp);
return Result.success(response);
} else {
- String answer = "";
conversationId = tailLogAiAnalysisDTO.getConversationId();
//This is not first request, need lock
- Object lock = LOCK.computeIfAbsent(conversationId, k -> new
Object());
- synchronized (lock) {
- Map<String, List<BotQAParam.QAParam>> cache =
QA_CACHE.get(conversationId);
- if (cache == null || cache.isEmpty()) {
- cache = getHistoryFromDb(conversationId);
- }
- List<BotQAParam.QAParam> modelHistory = cache.get(MODEL_KEY);
- List<BotQAParam.QAParam> originalHistory =
cache.get(ORIGINAL_KEY);
- try {
- BotQAParam param = new BotQAParam();
- param.setHistoryConversation(modelHistory);
-
param.setLatestQuestion(gson.toJson(tailLogAiAnalysisDTO.getLogs()));
- String text = formatString(param);
-
analysisBot.getRc().news.put(Message.builder().content(gson.toJson(text)).build());
- Message result = analysisBot.run().join();
- answer = result.getContent();
-
- } catch (InterruptedException e) {
- log.error("An error occurred in the request for the large
model, err: {}", e.getMessage());
- return Result.fail(CommonError.SERVER_ERROR.getCode(), "An
error occurred in the request for the large model");
- }
- BotQAParam.QAParam conversation = new BotQAParam.QAParam();
-
conversation.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss")));
-
conversation.setUser(gson.toJson(tailLogAiAnalysisDTO.getLogs()));
- conversation.setBot(answer);
+
+ Map<String, List<BotQAParam.QAParam>> cache =
getConversation(conversationId);
+ List<BotQAParam.QAParam> modelHistory = cache.get(MODEL_KEY);
+ List<BotQAParam.QAParam> originalHistory = cache.get(ORIGINAL_KEY);
+ AnalysisResult analysisResult =
processHistoryConversation(conversationId, cache, tailLogAiAnalysisDTO);
+ String answer = analysisResult.getAnswer();
+ BotQAParam.QAParam conversation = new BotQAParam.QAParam();
+
conversation.setTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss")));
+ conversation.setUser(formatLogs(tailLogAiAnalysisDTO.getLogs()));
+ conversation.setBot(answer);
+ if (analysisResult.getCompressedModelHistory() != null) {
+ List<BotQAParam.QAParam> compressedModelHistory =
analysisResult.getCompressedModelHistory();
+ compressedModelHistory.add(conversation);
+ cache.put(MODEL_KEY, compressedModelHistory);
+ } else {
modelHistory.add(conversation);
- originalHistory.add(conversation);
cache.put(MODEL_KEY, modelHistory);
- cache.put(ORIGINAL_KEY, originalHistory);
- QA_CACHE.put(conversationId, cache);
- CONVERSATION_TIME.put(conversationId,
System.currentTimeMillis());
}
- LOCK.remove(conversationId);
+ originalHistory.add(conversation);
+ cache.put(ORIGINAL_KEY, originalHistory);
+ putCache(conversationId, cache);
response.setConversationId(conversationId);
response.setContent(answer);
return Result.success(response);
@@ -202,125 +198,237 @@ public class MilogAiAnalysisServiceImpl implements
MilogAiAnalysisService {
}
+ private AnalysisResult processHistoryConversation(Long conversationId,
Map<String, List<BotQAParam.QAParam>> cache, LogAiAnalysisDTO
tailLogAiAnalysisDTO) {
+ List<BotQAParam.QAParam> modelHistory = cache.get(MODEL_KEY);
+ List<BotQAParam.QAParam> originalHistory = cache.get(ORIGINAL_KEY);
+ AnalysisResult res = new AnalysisResult();
+ try {
+ BotQAParam param = new BotQAParam();
+ param.setHistoryConversation(modelHistory);
+
param.setLatestQuestion(formatLogs(tailLogAiAnalysisDTO.getLogs()));
+ String paramJson = gson.toJson(param);
+ if (TOKENIZER.countTokens(paramJson) < 70000) {
+
analysisBot.getRc().news.put(Message.builder().content(paramJson).build());
+ Message result = analysisBot.run().join();
+ String answer = result.getContent();
+ res.setAnswer(answer);
+ return res;
+ } else {
+ return analysisAndCompression(modelHistory, originalHistory,
tailLogAiAnalysisDTO.getLogs(), conversationId);
+ }
+ } catch (InterruptedException e) {
+ log.error("An error occurred in the request for the large model,
err: {}", e.getMessage());
+ }
+ return res;
+ }
- private Map<String, List<BotQAParam.QAParam>> getHistoryFromDb(Long
conversationId) {
- MilogAiConversationDO milogAiConversationDO =
milogAiConversationMapper.selectById(conversationId);
- String conversationContext =
milogAiConversationDO.getConversationContext();
+ private AnalysisResult analysisAndCompression(List<BotQAParam.QAParam>
modelHistory, List<BotQAParam.QAParam> originalHistory, List<String>
latestConversation, Long conversationId) {
+ AnalysisResult analysisResult = new AnalysisResult();
- if (conversationContext == null || conversationContext.isBlank()) {
- return Collections.emptyMap();
- }
+ AtomicReference<String> answer = new AtomicReference<>("");
+ Future<?> analysisFuture = executor.submit(() -> {
+ try {
+ BotQAParam param = new BotQAParam();
+ param.setHistoryConversation(modelHistory);
+ param.setLatestQuestion(gson.toJson(latestConversation));
+ String paramJson = gson.toJson(param);
+
analysisBot.getRc().news.put(Message.builder().content(paramJson).build());
+ Message result = analysisBot.run().join();
+ answer.set(result.getContent());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ AtomicReference<List<BotQAParam.QAParam>> newModelHistory = new
AtomicReference<>(modelHistory);
+ Future<?> compressionFuture = executor.submit(() -> {
+ int index = compressIndex(modelHistory, originalHistory);
+ if (index <= 0) {
+ newModelHistory.set(modelHistory);
+ return;
+ }
+ List<BotQAParam.QAParam> needCompress = new
ArrayList<>(originalHistory.subList(0, index));
+ List<BotQAParam.QAParam> unchangeList = new
ArrayList<>(originalHistory.subList(index, originalHistory.size()));
- List<BotQAParam.QAParam> modelConversation =
gson.fromJson(conversationContext, new TypeToken<List<BotQAParam.QAParam>>() {
- }.getType());
- List<BotQAParam.QAParam> originalConversation =
gson.fromJson(milogAiConversationDO.getOriginalConversation(), new
TypeToken<List<BotQAParam.QAParam>>() {
- }.getType());
+ String needCompressJson = gson.toJson(needCompress);
+ //Compress the content that needs to be compressed to have the
same number of tokens as the content that does not need to be compressed, as
much as possible.
- Map<String, List<BotQAParam.QAParam>> res = new HashMap<>();
- res.put(MODEL_KEY, modelConversation);
- res.put(ORIGINAL_KEY, originalConversation);
- return res;
- }
+ int currentTokenCount = TOKENIZER.countTokens(needCompressJson);
+ int targetTokenCount =
TOKENIZER.countTokens(gson.toJson(unchangeList));
+ Map<MetaKey, MetaValue> meta = new HashMap<>();
+ MetaKey currentKey =
MetaKey.builder().key("currentCount").desc("currentCount").build();
+ MetaValue currentValue =
MetaValue.builder().value(currentTokenCount).desc("currentCount").build();
+ meta.put(currentKey, currentValue);
- private void processTask() {
- for (Map.Entry<Long, Long> entry : CONVERSATION_TIME.entrySet()) {
- Long conversationId = entry.getKey();
- Object lock = LOCK.computeIfAbsent(conversationId, k -> new
Object());
- synchronized (lock) {
- saveHistory(conversationId);
- //It has not been operated for a long time
- if (System.currentTimeMillis() - entry.getValue() >
CONVERSATION_TIMEOUT) {
- CONVERSATION_TIME.remove(entry.getKey());
- log.info("clean timeout conversation: {}", entry.getKey());
- }
+ MetaKey targetKey =
MetaKey.builder().key("targetCount").desc("targetCount").build();
+ MetaValue targetValue =
MetaValue.builder().value(targetTokenCount).desc("targetCount").build();
+ meta.put(targetKey, targetValue);
+ String res;
+ try {
+ contentSimplifyBot.getRc().news.put(
+
Message.builder().content(needCompressJson).meta(meta).build());
+ Message result = contentSimplifyBot.run().join();
+ res = result.getContent();
+ } catch (Exception e) {
+ log.error("An error occurred when requesting the large model
to compress data, error: {}", e.getMessage());
+ return;
}
- LOCK.remove(conversationId);
+ if (res == null || res.isBlank()) {
+ return;
+ }
+
+ List<BotQAParam.QAParam> compressedList = gson.fromJson(
+ res,
+ new TypeToken<List<BotQAParam.QAParam>>() {
+ }.getType()
+ );
+ if (compressedList == null || compressedList.isEmpty()) {
+ return;
+ }
+ compressedList.addAll(unchangeList);
+ newModelHistory.set(compressedList);
+ });
+ try {
+ analysisFuture.get();
+ compressionFuture.get();
+ String s = answer.get();
+ List<BotQAParam.QAParam> paramList = newModelHistory.get();
+ analysisResult.setAnswer(s);
+ analysisResult.setCompressedModelHistory(paramList);
+ } catch (Exception e) {
+ log.error("analysis and compression of task execution error: {}",
e.getMessage());
}
+ return analysisResult;
}
- private void saveHistory(Long conversationId) {
- log.info("开始存入数据库, id : {}", conversationId);
- MilogAiConversationDO milogAiConversationDO =
milogAiConversationMapper.selectById(conversationId);
- Map<String, List<BotQAParam.QAParam>> map =
QA_CACHE.get(conversationId);
- List<BotQAParam.QAParam> modelHistory = map.get(MODEL_KEY);
- List<BotQAParam.QAParam> originalHistory = map.get(ORIGINAL_KEY);
- milogAiConversationDO.setUpdateTime(System.currentTimeMillis());
-
milogAiConversationDO.setConversationContext(gson.toJson(modelHistory));
-
milogAiConversationDO.setOriginalConversation(gson.toJson(originalHistory));
- milogAiConversationMapper.updateById(milogAiConversationDO);
- }
+ private void checkTokenLength() {
+ if (!trySimpleLock(GLOBAL_CHECK_LOCK_KEY, 50L)) {
+ return;
+ }
+ Set<String> allCacheKey = getAllCacheKey();
+ if (allCacheKey.isEmpty()) {
+ return;
+ }
+ for (String key : allCacheKey) {
+ executor.submit(() -> {
+ String[] split = key.split(":");
+ String uuid = UUID.randomUUID().toString();
+ if (split.length == 0) {
+ return;
+ }
+ Long conversationId;
+ try {
+ conversationId = Long.valueOf(split[split.length - 1]);
+ } catch (NumberFormatException e) {
+ log.warn("invalid conversation key: {}", key);
+ return;
+ }
+ if (!tryLock(conversationId, uuid, 300L)) {
+ return;
+ }
- private void checkTokenLength() {
- for (Map.Entry<Long, Map<String, List<BotQAParam.QAParam>>> entry :
QA_CACHE.entrySet()) {
- Long conversationId = entry.getKey();
- Object lock = LOCK.computeIfAbsent(conversationId, k -> new
Object());
- synchronized (lock) {
- List<BotQAParam.QAParam> originalHistory =
entry.getValue().get(ORIGINAL_KEY);
- Integer index = compressIndex(entry.getValue());
- if (index > 0) {
- List<BotQAParam.QAParam> needCompress =
originalHistory.subList(0, index);
- List<BotQAParam.QAParam> unchangeList =
originalHistory.subList(index, originalHistory.size());
- String res = "";
+ try {
+ String value = jedisCluster.get(key);
+ if (value == null || value.isEmpty()) {
+ return;
+ }
+ Map<String, List<BotQAParam.QAParam>> map =
gson.fromJson(value, new TypeToken<Map<String, List<BotQAParam.QAParam>>>() {
+ }.getType());
+ if (map == null || map.isEmpty()) {
+ return;
+ }
+ int index = compressIndex(map);
+ if (index <= 0) {
+ return;
+ }
+ List<BotQAParam.QAParam> originalHistory =
map.get(ORIGINAL_KEY);
+ if (originalHistory == null || originalHistory.size() <=
index) {
+ return;
+ }
+ List<BotQAParam.QAParam> needCompress = new
ArrayList<>(originalHistory.subList(0, index));
+ List<BotQAParam.QAParam> unchangeList = new
ArrayList<>(originalHistory.subList(index, originalHistory.size()));
+
+ String res;
try {
-
contentSimplifyBot.getRc().news.put(Message.builder().content(gson.toJson(needCompress)).build());
+ contentSimplifyBot.getRc().news.put(
+
Message.builder().content(gson.toJson(needCompress)).build());
Message result = contentSimplifyBot.run().join();
res = result.getContent();
} catch (Exception e) {
- log.error("An error occurred when requesting the large
model to compress data");
+ log.error("An error occurred when requesting the large
model to compress data, key: {}, error: {}", key, e.getMessage());
+ return;
}
- if (!res.isBlank()) {
- List<BotQAParam.QAParam> compressedList =
gson.fromJson(res, new TypeToken<List<BotQAParam.QAParam>>() {
- }.getType());
- compressedList.addAll(unchangeList);
- entry.getValue().put(MODEL_KEY, compressedList);
- QA_CACHE.put(entry.getKey(), entry.getValue());
+ if (res == null || res.isBlank()) {
+ return;
}
+
+ List<BotQAParam.QAParam> compressedList = gson.fromJson(
+ res,
+ new TypeToken<List<BotQAParam.QAParam>>() {
+ }.getType()
+ );
+ if (compressedList == null || compressedList.isEmpty()) {
+ return;
+ }
+ compressedList.addAll(unchangeList);
+ map.put(MODEL_KEY, compressedList);
+ jedisCluster.setex(key, 60 * 60, gson.toJson(map));
+ } catch (Exception e) {
+ log.error("checkTokenLength error for key: {}, error: {}",
key, e.getMessage());
+ } finally {
+ unLock(conversationId, uuid);
}
- }
- LOCK.remove(conversationId);
- }
- }
- private static String formatString(BotQAParam param) {
- StringBuilder sb = new StringBuilder();
- List<BotQAParam.QAParam> historyConversation =
param.getHistoryConversation();
- if (historyConversation != null && !historyConversation.isEmpty()) {
- sb.append("历史对话:\n");
- historyConversation.forEach(h -> {
- sb.append(String.format("[%s] ###用户: %s ###助手: %s\n",
h.getTime(), h.getUser(), h.getBot()));
});
}
- sb.append("最新问题: \n ###用户: ").append(param.getLatestQuestion());
- return sb.toString()
- .replaceAll("[\u0000-\u001F]", "")
- .replaceAll("[\\\\\"]", "");
}
-
private static Boolean requestExceedLimit(List<String> logs) {
- String request = gson.toJson(logs);
- if (request.length() >= 20000) {
- return true;
- }
- return false;
+ String formatLog = formatLogs(logs);
+ int count = TOKENIZER.countTokens(formatLog);
+ return count > 20000;
}
private static Integer compressIndex(Map<String, List<BotQAParam.QAParam>>
map) {
List<BotQAParam.QAParam> paramList = map.get(MODEL_KEY);
- if (gson.toJson(paramList).length() <= 40000) {
+ String modelJson = gson.toJson(paramList);
+ int count = TOKENIZER.countTokens(modelJson);
+ if (count <= 70000) {
return 0;
}
- int limit = 20000;
+ int limit = 20000;
List<BotQAParam.QAParam> originalList = map.get(ORIGINAL_KEY);
int sum = 0;
int index = originalList.size();
for (int i = originalList.size() - 1; i >= 0; i--) {
BotQAParam.QAParam param = originalList.get(i);
String str = gson.toJson(param);
- sum += str.length();
+ sum += TOKENIZER.countTokens(str);
+ ;
+ index = i;
+ if (sum >= limit) {
+ break;
+ }
+ }
+ int maxCompress = originalList.size() - 20;
+ return Math.max(index, maxCompress);
+ }
+
+ private static Integer compressIndex(List<BotQAParam.QAParam> paramList,
List<BotQAParam.QAParam> originalList) {
+ String modelJson = gson.toJson(paramList);
+ int count = TOKENIZER.countTokens(modelJson);
+ if (count <= 50000) {
+ return 0;
+ }
+ int limit = 20000;
+ int sum = 0;
+ int index = originalList.size();
+ for (int i = originalList.size() - 1; i >= 0; i--) {
+ BotQAParam.QAParam param = originalList.get(i);
+ String str = gson.toJson(param);
+ sum += TOKENIZER.countTokens(str);
index = i;
if (sum >= limit) {
break;
@@ -333,16 +441,36 @@ public class MilogAiAnalysisServiceImpl implements
MilogAiAnalysisService {
@Override
public void shutdown() {
- if (!QA_CACHE.isEmpty()){
- log.info("The project is closed and the cache is flushed to the
disk");
- for (Map.Entry<Long, Map<String, List<BotQAParam.QAParam>>> entry
: QA_CACHE.entrySet()) {
- MilogAiConversationDO milogAiConversationDO =
milogAiConversationMapper.selectById(entry.getKey());
- List<BotQAParam.QAParam> modelHistory =
entry.getValue().get(MODEL_KEY);
- List<BotQAParam.QAParam> originalHistory =
entry.getValue().get(ORIGINAL_KEY);
-
milogAiConversationDO.setUpdateTime(System.currentTimeMillis());
-
milogAiConversationDO.setConversationContext(gson.toJson(modelHistory));
-
milogAiConversationDO.setOriginalConversation(gson.toJson(originalHistory));
- milogAiConversationMapper.updateById(milogAiConversationDO);
+ if (!trySimpleLock(GLOBAL_SHUTDOWN_LOCK_KEY, 120L)) {
+ return;
+ }
+ Set<String> allCacheKey = getAllCacheKey();
+ if (!allCacheKey.isEmpty()) {
+ List<Future<?>> futures = new ArrayList<>();
+ for (String key : allCacheKey) {
+ Future<?> future = executor.submit(() -> {
+ String[] split = key.split(":");
+ Long conversationId = Long.valueOf(split[split.length -
1]);
+ String value = jedisCluster.get(key);
+ Map<String, List<BotQAParam.QAParam>> map =
gson.fromJson(value, new TypeToken<Map<String, List<BotQAParam.QAParam>>>() {
+ }.getType());
+ List<BotQAParam.QAParam> modelHistory = map.get(MODEL_KEY);
+ List<BotQAParam.QAParam> originalHistory =
map.get(ORIGINAL_KEY);
+ MilogAiConversationDO conversationDO =
milogAiConversationMapper.selectById(conversationId);
+ if (conversationDO != null) {
+
conversationDO.setOriginalConversation(gson.toJson(originalHistory));
+
conversationDO.setConversationContext(gson.toJson(modelHistory));
+ milogAiConversationMapper.updateById(conversationDO);
+ }
+ });
+ futures.add(future);
+ }
+ for (Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ log.error("A future task execute failed: {}",
e.getMessage());
+ }
}
}
}
@@ -352,7 +480,7 @@ public class MilogAiAnalysisServiceImpl implements
MilogAiAnalysisService {
MoneUser user = MoneUserContext.getCurrentUser();
List<MilogAiConversationDO> historyList =
milogAiConversationMapper.getListByUserAndStore(storeId, user.getUser());
List<AiAnalysisHistoryDTO> result = new ArrayList<>();
- if(!historyList.isEmpty()){
+ if (!historyList.isEmpty()) {
result = historyList.stream().map(h -> {
AiAnalysisHistoryDTO dto = new AiAnalysisHistoryDTO();
dto.setId(h.getId());
@@ -366,21 +494,22 @@ public class MilogAiAnalysisServiceImpl implements
MilogAiAnalysisService {
@Override
public Result<List<BotQAParam.QAParam>> getAiConversation(Long id) {
- Map<String, List<BotQAParam.QAParam>> stringListMap = QA_CACHE.get(id);
+ Map<String, List<BotQAParam.QAParam>> stringListMap = getCache(id);
if (stringListMap != null && !stringListMap.isEmpty()) {
List<BotQAParam.QAParam> paramList =
stringListMap.get(ORIGINAL_KEY);
return Result.success(paramList);
}
MilogAiConversationDO conversationDO =
milogAiConversationMapper.selectById(id);
String originalConversationStr =
conversationDO.getOriginalConversation();
- List<BotQAParam.QAParam> res = gson.fromJson(originalConversationStr,
new TypeToken<List<BotQAParam.QAParam>>() {}.getType());
+ List<BotQAParam.QAParam> res = gson.fromJson(originalConversationStr,
new TypeToken<List<BotQAParam.QAParam>>() {
+ }.getType());
return Result.success(res);
}
@Override
public Result<Boolean> deleteAiConversation(Long id) {
milogAiConversationMapper.deleteById(id);
- QA_CACHE.remove(id);
+ removeCache(id);
return Result.success(true);
}
@@ -395,14 +524,14 @@ public class MilogAiAnalysisServiceImpl implements
MilogAiAnalysisService {
@Override
public Result<Boolean> closeAiAnalysis(Long id) {
- Map<String, List<BotQAParam.QAParam>> stringListMap = QA_CACHE.get(id);
+ Map<String, List<BotQAParam.QAParam>> stringListMap = getCache(id);
if (stringListMap != null && !stringListMap.isEmpty()) {
MilogAiConversationDO conversationDO =
milogAiConversationMapper.selectById(id);
conversationDO.setUpdateTime(System.currentTimeMillis());
conversationDO.setConversationContext(gson.toJson(stringListMap.get(MODEL_KEY)));
conversationDO.setOriginalConversation(gson.toJson(stringListMap.get(ORIGINAL_KEY)));
milogAiConversationMapper.updateById(conversationDO);
- QA_CACHE.remove(id);
+ removeCache(id);
}
return Result.success(true);
}
@@ -413,4 +542,129 @@ public class MilogAiAnalysisServiceImpl implements
MilogAiAnalysisService {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss.SSS");
return dateTime.format(formatter);
}
+
+ private Map<String, List<BotQAParam.QAParam>> getConversation(Long
conversationId) {
+ String redisKey = MILOG_AI_KEY_PREFIX + conversationId;
+ String value = jedisCluster.get(redisKey);
+ if (value != null && !value.isEmpty()) {
+ Map<String, List<BotQAParam.QAParam>> map = gson.fromJson(value,
new TypeToken<Map<String, List<BotQAParam.QAParam>>>() {
+ }.getType());
+ return map;
+ }
+
+ MilogAiConversationDO conversationDO =
milogAiConversationMapper.selectById(conversationId);
+ if (conversationDO != null) {
+ Map<String, List<BotQAParam.QAParam>> conversationMap = new
HashMap<>();
+ String conversationContext =
conversationDO.getConversationContext();
+ List<BotQAParam.QAParam> modelConversation =
gson.fromJson(conversationContext, new TypeToken<List<BotQAParam.QAParam>>() {
+ }.getType());
+ String originalConversationStr =
conversationDO.getOriginalConversation();
+ List<BotQAParam.QAParam> originalConversation =
gson.fromJson(originalConversationStr, new
TypeToken<List<BotQAParam.QAParam>>() {
+ }.getType());
+
+ conversationMap.put(MODEL_KEY, modelConversation);
+ conversationMap.put(ORIGINAL_KEY, originalConversation);
+
+ putCache(conversationId, conversationMap);
+ return conversationMap;
+ }
+
+ return new HashMap<>();
+ }
+
+ private static boolean putCache(Long conversationId, Map<String,
List<BotQAParam.QAParam>> map) {
+ if (map == null || map.isEmpty()) {
+ return false;
+ }
+ String redisKey = MILOG_AI_KEY_PREFIX + conversationId;
+ String res = jedisCluster.setex(redisKey, 60 * 60, gson.toJson(map));
+ return true;
+ }
+
+ private Map<String, List<BotQAParam.QAParam>> getCache(Long
conversationId) {
+ String redisKey = MILOG_AI_KEY_PREFIX + conversationId;
+ String value = jedisCluster.get(redisKey);
+ if (value != null && !value.isEmpty()) {
+ Map<String, List<BotQAParam.QAParam>> map = gson.fromJson(value,
new TypeToken<Map<String, List<BotQAParam.QAParam>>>() {
+ }.getType());
+ return map;
+ }
+ return new HashMap<>();
+ }
+
+
+ private static void removeCache(Long conversationId) {
+ String redisKey = MILOG_AI_KEY_PREFIX + conversationId;
+ jedisCluster.del(redisKey);
+ }
+
+ private static String formatLogs(List<String> logs) {
+ return String.join("\n", logs);
+ }
+
+ private Set<String> getAllCacheKey() {
+ Set<String> keys = new HashSet<>();
+ Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
+ for (Map.Entry<String, JedisPool> entry : clusterNodes.entrySet()) {
+ try (Jedis jedis = entry.getValue().getResource()) {
+ String cursor = "0";
+ do {
+ ScanResult<String> scanResult = jedis.scan(cursor, new
ScanParams().match(MILOG_AI_KEY_PREFIX + "*").count(1000));
+ keys.addAll(scanResult.getResult());
+ cursor = scanResult.getCursor();
+ } while (!cursor.equals("0"));
+ } catch (Exception e) {
+ log.error("Failed to retrieve all conversation keys from
Redis!");
+ }
+ }
+ return keys;
+ }
+
+ private boolean tryLock(Long conversationId, String value, Long
expireSeconds) {
+ String lockKey = LOCK_PREFIX + conversationId;
+ SetParams params = new SetParams();
+ params.nx();
+ params.px(expireSeconds * 1000);
+ String res = jedisCluster.set(lockKey, value, params);
+ return "OK".equals(res);
+ }
+
+ private boolean trySimpleLock(String key, Long expireSeconds) {
+ SetParams params = new SetParams();
+ params.nx();
+ params.px(expireSeconds * 1000);
+ String res = jedisCluster.set(key, "1", params);
+ return "OK".equals(res);
+ }
+
+ private static final String UNLOCK_LUA =
+ "if redis.call('get', KEYS[1]) == ARGV[1] then " +
+ " return redis.call('del', KEYS[1]) " +
+ "else " +
+ " return 0 " +
+ "end";
+
+ private void unLock(Long conversationId, String value) {
+ String lockKey = LOCK_PREFIX + conversationId;
+ try {
+ jedisCluster.eval(UNLOCK_LUA, Collections.singletonList(lockKey),
Collections.singletonList(value));
+
+ } catch (Exception e) {
+ log.error("failed to unlock key: {}, error:{}", lockKey,
e.getMessage());
+ }
+ }
+
+ @Data
+ static class AnalysisResult {
+ private String answer;
+ private List<BotQAParam.QAParam> compressedModelHistory;
+ }
+
+ @Data
+ static class CompressionIndex {
+ private Integer index;
+ private Integer currentTokenCount;
+ private Integer targetTokenCount;
+ }
+
}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
index 5c869640..04a0209a 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
@@ -22,9 +22,11 @@ import cn.hutool.core.thread.ThreadUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.xiaomi.data.push.common.SafeRun;
import com.xiaomi.youpin.docean.anno.Component;
+import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
import com.xiaomi.youpin.docean.plugin.nacos.NacosConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.api.service.PublishConfigService;
import org.apache.ozhera.log.manager.dao.MilogLogTailDao;
import org.apache.ozhera.log.manager.dao.MilogLogstoreDao;
import org.apache.ozhera.log.manager.mapper.MilogLogTemplateMapper;
@@ -68,6 +70,9 @@ public class ManagerLevelFilterConfigListener {
@Resource
private MilogLogstoreDao logStoreDao;
+ @Reference(interfaceClass = PublishConfigService.class, group =
"$dubbo.env.group", check = false, timeout = 14000)
+ private PublishConfigService publishConfigService;
+
private TailExtensionService tailExtensionService;
private final String logLevelFilterKey = "log.level.filter.config.manager";
@@ -136,41 +141,13 @@ public class ManagerLevelFilterConfigListener {
if (config != null && config.getEnableGlobalFilter() &&
areElementsSameIgnoreCase(newConfig.getLogLevelList(),
config.getLogLevelList())) {
return;
}
-// globalUpdateSendMsg();
+ globalUpdateSendMsg();
}
config = newConfig;
}
public void globalUpdateSendMsg() {
- AtomicLong lastId = new AtomicLong(0L);
- ConcurrentLinkedQueue<MilogLogTailDo> failedTailList = new
ConcurrentLinkedQueue<>();
- List<CompletableFuture<Void>> futureList = new ArrayList<>();
- try {
- while (true) {
- List<MilogLogTailDo> logTailByLastIdList =
logtailDao.getLogTailByLastId(lastId.get(), BATCH_SIZE);
- if (logTailByLastIdList.isEmpty()) break;
- CompletableFuture<Void> future = CompletableFuture.runAsync(()
-> {
- logTailByLastIdList.forEach(tail -> {
- try {
- updateSingleTail(tail);
- } catch (Exception e) {
- failedTailList.offer(tail);
- log.error("Failed to update tail: {}",
tail.getId(), e);
- }
- });
- }, logUpdateExecutor);
- futureList.add(future);
- lastId.set(logTailByLastIdList.get(logTailByLastIdList.size()
- 1).getId());
- }
-
- CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).join();
-
- if (!failedTailList.isEmpty()) {
- handleFailedTails(failedTailList);
- }
- } catch (Exception e) {
- log.error("Global log config update failed", e);
- }
+ log.info("global update,skip!");
}
private void handleFailedTails(Queue<MilogLogTailDo> failedTailList) {
diff --git a/ozhera-log/log-manager/src/main/resources/config.properties
b/ozhera-log/log-manager/src/main/resources/config.properties
index a1dafe8a..0eab2587 100644
--- a/ozhera-log/log-manager/src/main/resources/config.properties
+++ b/ozhera-log/log-manager/src/main/resources/config.properties
@@ -76,3 +76,12 @@ tpc.devMode=${tpc.devMode}
kafka.use.ssl=true
kafka.sll.location=/opt/app/mix.4096.client.truststore.jks
+## redis
+redis.pool.max.total=${redis.pool.max.total}
+redis.pool.max.idle=${redis.pool.max.idle}
+redis.pool.min.idle=${redis.pool.min.idle}
+redis.pool.max.wait=${redis.pool.max.wait}
+redis.pool.min.evictable.idle.time=${redis.pool.min.evictable.idle.time}
+redis.connection.timeout=${redis.connection.timeout}
+redis.socket.timeout=${redis.socket.timeout}
+redis.max.attempts=${redis.max.attempts}
\ No newline at end of file
diff --git a/ozhera-log/log-manager/src/main/resources/config/open.properties
b/ozhera-log/log-manager/src/main/resources/config/open.properties
index c1827fef..48a3eafe 100644
--- a/ozhera-log/log-manager/src/main/resources/config/open.properties
+++ b/ozhera-log/log-manager/src/main/resources/config/open.properties
@@ -70,3 +70,13 @@ tpc_node_code=logger
filter_urls=queryAgentK8sIp,queryAgentConfig,matrixLogQuery
agent.heart.senders=zhangsan
tpc.devMode=false
+
+## redis
+redis.pool.max.total=50
+redis.pool.max.idle=30
+redis.pool.min.idle=10
+redis.pool.max.wait=1000
+redis.pool.min.evictable.idle.time=600000
+redis.connection.timeout=3000
+redis.socket.timeout=1000
+redis.max.attempts=2
\ No newline at end of file
diff --git a/ozhera-log/pom.xml b/ozhera-log/pom.xml
index 9600e9a5..1f804402 100644
--- a/ozhera-log/pom.xml
+++ b/ozhera-log/pom.xml
@@ -103,6 +103,17 @@ http://www.apache.org/licenses/LICENSE-2.0
<artifactId>hutool-all</artifactId>
<version>5.8.21</version>
</dependency>
+
+ <dependency>
+ <groupId>com.knuddels</groupId>
+ <artifactId>jtokkit</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>3.3.0</version>
+ </dependency>
</dependencies>
</dependencyManagement>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]