This is an automated email from the ASF dual-hosted git repository.
wuzhiguo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bigtop-manager.git
The following commit(s) were added to refs/heads/main by this push:
new c7348033 BIGTOP-4267: Refactor AI module with langchain4j (#107)
c7348033 is described below
commit c73480333c7c0fc702ccd4043398e683c0d6deb4
Author: haopeng <[email protected]>
AuthorDate: Thu Nov 21 14:34:08 2024 +0800
BIGTOP-4267: Refactor AI module with langchain4j (#107)
---
.../manager/ai/core/AbstractAIAssistant.java | 61 ++++-
.../manager/ai/core/factory/AIAssistant.java | 20 +-
.../bigtop-manager-ai-dashscope/pom.xml | 10 +-
.../manager/ai/dashscope/DashScopeAssistant.java | 300 ++-------------------
.../manager/ai/dashscope/DashScopeThreadParam.java | 36 ---
.../bigtop/manager/ai/openai/OpenAIAssistant.java | 73 ++---
.../manager/ai/qianfan/QianFanAssistant.java | 72 ++---
bigtop-manager-bom/pom.xml | 11 +-
.../apache/bigtop/manager/dao/po/ChatThreadPO.java | 3 -
.../model/converter/ChatThreadConverter.java | 2 -
.../manager/server/model/dto/ChatThreadDTO.java | 2 -
.../server/service/impl/ChatbotServiceImpl.java | 19 +-
.../src/main/resources/ddl/MySQL-DDL-CREATE.sql | 1 -
.../main/resources/ddl/PostgreSQL-DDL-CREATE.sql | 1 -
14 files changed, 142 insertions(+), 469 deletions(-)
diff --git
a/bigtop-manager-ai/bigtop-manager-ai-core/src/main/java/org/apache/bigtop/manager/ai/core/AbstractAIAssistant.java
b/bigtop-manager-ai/bigtop-manager-ai-core/src/main/java/org/apache/bigtop/manager/ai/core/AbstractAIAssistant.java
index 50750514..5dcf6d61 100644
---
a/bigtop-manager-ai/bigtop-manager-ai-core/src/main/java/org/apache/bigtop/manager/ai/core/AbstractAIAssistant.java
+++
b/bigtop-manager-ai/bigtop-manager-ai-core/src/main/java/org/apache/bigtop/manager/ai/core/AbstractAIAssistant.java
@@ -21,15 +21,31 @@ package org.apache.bigtop.manager.ai.core;
import org.apache.bigtop.manager.ai.core.factory.AIAssistant;
import org.apache.bigtop.manager.ai.core.provider.AIAssistantConfigProvider;
+import dev.langchain4j.data.message.AiMessage;
+import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.memory.ChatMemory;
+import dev.langchain4j.memory.chat.MessageWindowChatMemory;
+import dev.langchain4j.model.StreamingResponseHandler;
+import dev.langchain4j.model.chat.ChatLanguageModel;
+import dev.langchain4j.model.chat.StreamingChatLanguageModel;
+import dev.langchain4j.model.output.Response;
import dev.langchain4j.store.memory.chat.ChatMemoryStore;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
public abstract class AbstractAIAssistant implements AIAssistant {
+ protected final ChatLanguageModel chatLanguageModel;
+ protected final StreamingChatLanguageModel streamingChatLanguageModel;
protected static final Integer MEMORY_LEN = 10;
protected final ChatMemory chatMemory;
- protected AbstractAIAssistant(ChatMemory chatMemory) {
+ protected AbstractAIAssistant(
+ ChatLanguageModel chatLanguageModel,
+ StreamingChatLanguageModel streamingChatLanguageModel,
+ ChatMemory chatMemory) {
+ this.chatLanguageModel = chatLanguageModel;
+ this.streamingChatLanguageModel = streamingChatLanguageModel;
this.chatMemory = chatMemory;
}
@@ -43,6 +59,39 @@ public abstract class AbstractAIAssistant implements
AIAssistant {
return chatMemory.id();
}
+ @Override
+ public Flux<String> streamAsk(String chatMessage) {
+ chatMemory.add(UserMessage.from(chatMessage));
+ return Flux.create(
+ emitter ->
streamingChatLanguageModel.generate(chatMemory.messages(), new
StreamingResponseHandler<>() {
+ @Override
+ public void onNext(String token) {
+ emitter.next(token);
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ emitter.error(error);
+ }
+
+ @Override
+ public void onComplete(Response<AiMessage> response) {
+ StreamingResponseHandler.super.onComplete(response);
+ chatMemory.add(response.content());
+ }
+ }),
+ FluxSink.OverflowStrategy.BUFFER);
+ }
+
+ @Override
+ public String ask(String chatMessage) {
+ chatMemory.add(UserMessage.from(chatMessage));
+ Response<AiMessage> generate =
chatLanguageModel.generate(chatMemory.messages());
+ String aiMessage = generate.content().text();
+ chatMemory.add(AiMessage.from(aiMessage));
+ return aiMessage;
+ }
+
public abstract static class Builder implements AIAssistant.Builder {
protected Object id;
@@ -65,5 +114,15 @@ public abstract class AbstractAIAssistant implements
AIAssistant {
this.chatMemoryStore = chatMemoryStore;
return this;
}
+
+ public MessageWindowChatMemory getChatMemory() {
+ MessageWindowChatMemory.Builder builder =
MessageWindowChatMemory.builder()
+ .chatMemoryStore(chatMemoryStore)
+ .maxMessages(MEMORY_LEN);
+ if (id != null) {
+ builder.id(id);
+ }
+ return builder.build();
+ }
}
}
diff --git
a/bigtop-manager-ai/bigtop-manager-ai-core/src/main/java/org/apache/bigtop/manager/ai/core/factory/AIAssistant.java
b/bigtop-manager-ai/bigtop-manager-ai-core/src/main/java/org/apache/bigtop/manager/ai/core/factory/AIAssistant.java
index 81bc3af3..788d1515 100644
---
a/bigtop-manager-ai/bigtop-manager-ai-core/src/main/java/org/apache/bigtop/manager/ai/core/factory/AIAssistant.java
+++
b/bigtop-manager-ai/bigtop-manager-ai-core/src/main/java/org/apache/bigtop/manager/ai/core/factory/AIAssistant.java
@@ -21,12 +21,12 @@ package org.apache.bigtop.manager.ai.core.factory;
import org.apache.bigtop.manager.ai.core.enums.PlatformType;
import org.apache.bigtop.manager.ai.core.provider.AIAssistantConfigProvider;
+import dev.langchain4j.memory.ChatMemory;
+import dev.langchain4j.model.chat.ChatLanguageModel;
+import dev.langchain4j.model.chat.StreamingChatLanguageModel;
import dev.langchain4j.store.memory.chat.ChatMemoryStore;
import reactor.core.publisher.Flux;
-import java.util.HashMap;
-import java.util.Map;
-
public interface AIAssistant {
/**
@@ -56,14 +56,6 @@ public interface AIAssistant {
*/
PlatformType getPlatform();
- /**
- * This is used to create a thread
- * @return
- */
- default Map<String, String> createThread() {
- return new HashMap<>();
- }
-
/**
* This is used to set system prompt
* @return
@@ -84,5 +76,11 @@ public interface AIAssistant {
Builder withConfigProvider(AIAssistantConfigProvider configProvider);
AIAssistant build();
+
+ ChatLanguageModel getChatLanguageModel();
+
+ StreamingChatLanguageModel getStreamingChatLanguageModel();
+
+ ChatMemory getChatMemory();
}
}
diff --git a/bigtop-manager-ai/bigtop-manager-ai-dashscope/pom.xml
b/bigtop-manager-ai/bigtop-manager-ai-dashscope/pom.xml
index 22d195d9..d062fa1f 100644
--- a/bigtop-manager-ai/bigtop-manager-ai-dashscope/pom.xml
+++ b/bigtop-manager-ai/bigtop-manager-ai-dashscope/pom.xml
@@ -37,14 +37,8 @@
</dependency>
<dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>dashscope-sdk-java</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>dev.langchain4j</groupId>
+ <artifactId>langchain4j-dashscope</artifactId>
</dependency>
</dependencies>
</project>
diff --git
a/bigtop-manager-ai/bigtop-manager-ai-dashscope/src/main/java/org/apache/bigtop/manager/ai/dashscope/DashScopeAssistant.java
b/bigtop-manager-ai/bigtop-manager-ai-dashscope/src/main/java/org/apache/bigtop/manager/ai/dashscope/DashScopeAssistant.java
index f178f8f0..1588a345 100644
---
a/bigtop-manager-ai/bigtop-manager-ai-dashscope/src/main/java/org/apache/bigtop/manager/ai/dashscope/DashScopeAssistant.java
+++
b/bigtop-manager-ai/bigtop-manager-ai-dashscope/src/main/java/org/apache/bigtop/manager/ai/dashscope/DashScopeAssistant.java
@@ -19,87 +19,29 @@
package org.apache.bigtop.manager.ai.dashscope;
import org.apache.bigtop.manager.ai.core.AbstractAIAssistant;
-import org.apache.bigtop.manager.ai.core.enums.MessageType;
import org.apache.bigtop.manager.ai.core.enums.PlatformType;
import org.apache.bigtop.manager.ai.core.factory.AIAssistant;
-import com.alibaba.dashscope.aigc.generation.Generation;
-import com.alibaba.dashscope.aigc.generation.GenerationParam;
-import com.alibaba.dashscope.assistants.Assistant;
-import com.alibaba.dashscope.assistants.AssistantParam;
-import com.alibaba.dashscope.assistants.Assistants;
-import com.alibaba.dashscope.common.GeneralListParam;
-import com.alibaba.dashscope.common.ListResult;
-import com.alibaba.dashscope.common.Message;
-import com.alibaba.dashscope.common.Role;
-import com.alibaba.dashscope.exception.InputRequiredException;
-import com.alibaba.dashscope.exception.InvalidateParameter;
-import com.alibaba.dashscope.exception.NoApiKeyException;
-import com.alibaba.dashscope.threads.AssistantStreamEvents;
-import com.alibaba.dashscope.threads.AssistantThread;
-import com.alibaba.dashscope.threads.ContentBase;
-import com.alibaba.dashscope.threads.ContentText;
-import com.alibaba.dashscope.threads.ThreadParam;
-import com.alibaba.dashscope.threads.Threads;
-import com.alibaba.dashscope.threads.messages.Messages;
-import com.alibaba.dashscope.threads.messages.TextMessageParam;
-import com.alibaba.dashscope.threads.messages.ThreadMessage;
-import com.alibaba.dashscope.threads.messages.ThreadMessageDelta;
-import com.alibaba.dashscope.threads.runs.AssistantStreamMessage;
-import com.alibaba.dashscope.threads.runs.Run;
-import com.alibaba.dashscope.threads.runs.RunParam;
-import com.alibaba.dashscope.threads.runs.Runs;
-import dev.langchain4j.data.message.AiMessage;
-import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.SystemMessage;
-import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.internal.ValidationUtils;
import dev.langchain4j.memory.ChatMemory;
-import dev.langchain4j.memory.chat.MessageWindowChatMemory;
-import io.reactivex.Flowable;
-import reactor.core.publisher.Flux;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import dev.langchain4j.model.chat.ChatLanguageModel;
+import dev.langchain4j.model.chat.StreamingChatLanguageModel;
+import dev.langchain4j.model.dashscope.QwenChatModel;
+import dev.langchain4j.model.dashscope.QwenStreamingChatModel;
public class DashScopeAssistant extends AbstractAIAssistant {
- private final Assistants assistants = new Assistants();
- private final Messages messages = new Messages();
- private final Threads threads = new Threads();
- private final Runs runs = new Runs();
- private final DashScopeThreadParam dashScopeThreadParam;
- public DashScopeAssistant(ChatMemory chatMemory, DashScopeThreadParam
dashScopeThreadParam) {
- super(chatMemory);
- this.dashScopeThreadParam = dashScopeThreadParam;
+ public DashScopeAssistant(
+ ChatLanguageModel chatLanguageModel,
+ StreamingChatLanguageModel streamingChatLanguageModel,
+ ChatMemory chatMemory) {
+ super(chatLanguageModel, streamingChatLanguageModel, chatMemory);
}
- private String getValueFromAssistantStreamMessage(AssistantStreamMessage
assistantStreamMessage) {
- ThreadMessageDelta threadMessageDelta = (ThreadMessageDelta)
assistantStreamMessage.getData();
- StringBuilder streamMessage = new StringBuilder();
-
- List<ContentBase> contents =
threadMessageDelta.getDelta().getContent();
- for (ContentBase content : contents) {
- ContentText contentText = (ContentText) content;
- streamMessage.append(contentText.getText().getValue());
- }
- return streamMessage.toString();
- }
-
- private void addMessage(String message, MessageType sender) {
- ChatMessage chatMessage;
- if (sender.equals(MessageType.AI)) {
- chatMessage = new AiMessage(message);
- } else if (sender.equals(MessageType.USER)) {
- chatMessage = new UserMessage(message);
- } else if (sender.equals(MessageType.SYSTEM)) {
- chatMessage = new SystemMessage(message);
- } else {
- return;
- }
- chatMemory.add(chatMessage);
+ @Override
+ public void setSystemPrompt(String systemPrompt) {
+ chatMemory.add(SystemMessage.systemMessage(systemPrompt));
}
@Override
@@ -107,217 +49,33 @@ public class DashScopeAssistant extends
AbstractAIAssistant {
return PlatformType.DASH_SCOPE;
}
- @Override
- public void setSystemPrompt(String systemPrompt) {
- if (dashScopeThreadParam.getAssistantId() == null) {
- return;
- }
- TextMessageParam textMessageParam = TextMessageParam.builder()
- .apiKey(dashScopeThreadParam.getApiKey())
- .role(Role.ASSISTANT.getValue())
- .content(systemPrompt)
- .build();
- try {
- messages.create(dashScopeThreadParam.getAssistantThreadId(),
textMessageParam);
- } catch (NoApiKeyException | InputRequiredException e) {
- throw new RuntimeException(e);
- }
- RunParam runParam = RunParam.builder()
- .apiKey(dashScopeThreadParam.getApiKey())
- .assistantId(dashScopeThreadParam.getAssistantId())
- .build();
- try {
- runs.create(dashScopeThreadParam.getAssistantThreadId(), runParam);
- } catch (NoApiKeyException | InputRequiredException |
InvalidateParameter e) {
- throw new RuntimeException(e);
- }
- addMessage(systemPrompt, MessageType.SYSTEM);
- }
-
public static Builder builder() {
return new Builder();
}
- @Override
- public Flux<String> streamAsk(String userMessage) {
- addMessage(userMessage, MessageType.USER);
- TextMessageParam textMessageParam = TextMessageParam.builder()
- .apiKey(dashScopeThreadParam.getApiKey())
- .role(Role.USER.getValue())
- .content(userMessage)
- .build();
- try {
- ThreadMessage message =
messages.create(dashScopeThreadParam.getAssistantThreadId(), textMessageParam);
- } catch (NoApiKeyException | InputRequiredException e) {
- throw new RuntimeException(e);
- }
-
- RunParam runParam = RunParam.builder()
- .apiKey(dashScopeThreadParam.getApiKey())
- .assistantId(dashScopeThreadParam.getAssistantId())
- .stream(true)
- .build();
- Flowable<AssistantStreamMessage> runFlowable = null;
- try {
- runFlowable =
runs.createStream(dashScopeThreadParam.getAssistantThreadId(), runParam);
- } catch (NoApiKeyException | InputRequiredException |
InvalidateParameter e) {
- throw new RuntimeException(e);
- }
- StringBuilder finalMessage = new StringBuilder();
- return Flux.from(runFlowable)
- .map(assistantStreamMessage -> {
- String message =
-
assistantStreamMessage.getEvent().equals(AssistantStreamEvents.THREAD_MESSAGE_DELTA)
- ?
getValueFromAssistantStreamMessage(assistantStreamMessage)
- : "";
- finalMessage.append(message);
- return message;
- })
- .doOnComplete(() -> {
- addMessage(finalMessage.toString(), MessageType.AI);
- });
- }
-
- @Override
- public String ask(String userMessage) {
- addMessage(userMessage, MessageType.USER);
- TextMessageParam textMessageParam = TextMessageParam.builder()
- .apiKey(dashScopeThreadParam.getApiKey())
- .role(Role.USER.getValue())
- .content(userMessage)
- .build();
- try {
- ThreadMessage message =
messages.create(dashScopeThreadParam.getAssistantThreadId(), textMessageParam);
- } catch (NoApiKeyException | InputRequiredException e) {
- throw new RuntimeException(e);
- }
-
- RunParam runParam = RunParam.builder()
- .apiKey(dashScopeThreadParam.getApiKey())
- .assistantId(dashScopeThreadParam.getAssistantId())
- .build();
- Run run;
- try {
- run = runs.create(dashScopeThreadParam.getAssistantThreadId(),
runParam);
- } catch (NoApiKeyException | InputRequiredException |
InvalidateParameter e) {
- throw new RuntimeException(e);
- }
- while (true) {
- if (run.getStatus().equals(Run.Status.CANCELLED)
- || run.getStatus().equals(Run.Status.COMPLETED)
- || run.getStatus().equals(Run.Status.FAILED)
- || run.getStatus().equals(Run.Status.REQUIRES_ACTION)
- || run.getStatus().equals(Run.Status.EXPIRED)) {
- break;
- } else {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- try {
- run = runs.retrieve(
- dashScopeThreadParam.getAssistantThreadId(),
run.getId(), dashScopeThreadParam.getApiKey());
- } catch (NoApiKeyException e) {
- throw new RuntimeException(e);
- }
- }
-
- ListResult<ThreadMessage> threadMessages = null;
- try {
- threadMessages = messages.list(
- dashScopeThreadParam.getAssistantThreadId(),
- GeneralListParam.builder()
- .apiKey(dashScopeThreadParam.getApiKey())
- .build());
- } catch (NoApiKeyException | InputRequiredException e) {
- throw new RuntimeException(e);
- }
- List<ThreadMessage> threadMessage = threadMessages.getData();
- if (threadMessage.isEmpty()) {
- return null;
- }
- List<ContentBase> contents = threadMessage.get(0).getContent();
- StringBuilder finalMessage = new StringBuilder();
- for (ContentBase content : contents) {
- ContentText contentText = (ContentText) content;
- finalMessage.append(contentText.getText().getValue());
- }
- addMessage(finalMessage.toString(), MessageType.AI);
- return finalMessage.toString();
- }
-
- @Override
- public boolean test() {
- Generation generation = new Generation();
- GenerationParam param = GenerationParam.builder()
- .apiKey(dashScopeThreadParam.getApiKey())
- .model(dashScopeThreadParam.getModel())
- .build();
-
- Message userMsg =
-
Message.builder().role(Role.USER.getValue()).content("1+1=").build();
- param.setMessages(Collections.singletonList(userMsg));
- try {
- generation.call(param);
- } catch (NoApiKeyException | InputRequiredException e) {
- throw new RuntimeException(e);
- }
- return true;
- }
+ public static class Builder extends AbstractAIAssistant.Builder {
- @Override
- public Map<String, String> createThread() {
- AssistantParam param = AssistantParam.builder()
- .model(dashScopeThreadParam.getModel())
- .apiKey(dashScopeThreadParam.getApiKey())
- .name("DashScope Assistant")
- .build();
- Map<String, String> threadInfo = new HashMap<>();
- try {
- Assistant assistant = assistants.create(param);
- threadInfo.put("assistantId", assistant.getId());
- } catch (NoApiKeyException e) {
- throw new RuntimeException(e);
- }
- ThreadParam threadParam =
-
ThreadParam.builder().apiKey(dashScopeThreadParam.getApiKey()).build();
- try {
- AssistantThread assistantThread = threads.create(threadParam);
- threadInfo.put("assistantThreadId", assistantThread.getId());
- } catch (NoApiKeyException e) {
- throw new RuntimeException(e);
+ @Override
+ public ChatLanguageModel getChatLanguageModel() {
+ String model =
ValidationUtils.ensureNotNull(configProvider.getModel(), "model");
+ String apiKey = ValidationUtils.ensureNotNull(
+ configProvider.getCredentials().get("apiKey"), "apiKey");
+ return
QwenChatModel.builder().apiKey(apiKey).modelName(model).build();
}
- return threadInfo;
- }
- public static class Builder extends AbstractAIAssistant.Builder {
-
- public AIAssistant build() {
+ @Override
+ public StreamingChatLanguageModel getStreamingChatLanguageModel() {
String model =
ValidationUtils.ensureNotNull(configProvider.getModel(), "model");
String apiKey = ValidationUtils.ensureNotNull(
configProvider.getCredentials().get("apiKey"), "apiKey");
- DashScopeThreadParam param = new DashScopeThreadParam();
- param.setApiKey(apiKey);
- param.setModel(model);
- String assistantThreadId =
configProvider.getConfigs().get("assistantThreadId");
- if (assistantThreadId != null) {
- param.setAssistantThreadId(assistantThreadId);
- }
- String assistantId =
configProvider.getConfigs().get("assistantId");
- if (assistantId != null) {
- param.setAssistantId(assistantId);
- }
- MessageWindowChatMemory.Builder builder =
MessageWindowChatMemory.builder()
- .chatMemoryStore(chatMemoryStore)
- .maxMessages(MEMORY_LEN);
- if (id != null) {
- builder.id(id);
- param.setThreadId(id);
- }
- MessageWindowChatMemory chatMemory = builder.build();
- return new DashScopeAssistant(chatMemory, param);
+ return QwenStreamingChatModel.builder()
+ .apiKey(apiKey)
+ .modelName(model)
+ .build();
+ }
+
+ public AIAssistant build() {
+ return new DashScopeAssistant(getChatLanguageModel(),
getStreamingChatLanguageModel(), getChatMemory());
}
}
}
diff --git
a/bigtop-manager-ai/bigtop-manager-ai-dashscope/src/main/java/org/apache/bigtop/manager/ai/dashscope/DashScopeThreadParam.java
b/bigtop-manager-ai/bigtop-manager-ai-dashscope/src/main/java/org/apache/bigtop/manager/ai/dashscope/DashScopeThreadParam.java
deleted file mode 100644
index 06e6f088..00000000
---
a/bigtop-manager-ai/bigtop-manager-ai-dashscope/src/main/java/org/apache/bigtop/manager/ai/dashscope/DashScopeThreadParam.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bigtop.manager.ai.dashscope;
-
-import lombok.Getter;
-import lombok.Setter;
-
-@Getter
-@Setter
-public class DashScopeThreadParam {
- private Object threadId;
-
- private String assistantId;
-
- private String assistantThreadId;
-
- private String model;
-
- private String apiKey;
-}
diff --git
a/bigtop-manager-ai/bigtop-manager-ai-openai/src/main/java/org/apache/bigtop/manager/ai/openai/OpenAIAssistant.java
b/bigtop-manager-ai/bigtop-manager-ai-openai/src/main/java/org/apache/bigtop/manager/ai/openai/OpenAIAssistant.java
index 845e00ba..e56adad6 100644
---
a/bigtop-manager-ai/bigtop-manager-ai-openai/src/main/java/org/apache/bigtop/manager/ai/openai/OpenAIAssistant.java
+++
b/bigtop-manager-ai/bigtop-manager-ai-openai/src/main/java/org/apache/bigtop/manager/ai/openai/OpenAIAssistant.java
@@ -22,68 +22,23 @@ import
org.apache.bigtop.manager.ai.core.AbstractAIAssistant;
import org.apache.bigtop.manager.ai.core.enums.PlatformType;
import org.apache.bigtop.manager.ai.core.factory.AIAssistant;
-import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.SystemMessage;
-import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.internal.ValidationUtils;
import dev.langchain4j.memory.ChatMemory;
-import dev.langchain4j.memory.chat.MessageWindowChatMemory;
-import dev.langchain4j.model.StreamingResponseHandler;
import dev.langchain4j.model.chat.ChatLanguageModel;
import dev.langchain4j.model.chat.StreamingChatLanguageModel;
import dev.langchain4j.model.openai.OpenAiChatModel;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;
-import dev.langchain4j.model.output.Response;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
public class OpenAIAssistant extends AbstractAIAssistant {
- private final ChatLanguageModel chatLanguageModel;
- private final StreamingChatLanguageModel streamingChatLanguageModel;
-
private static final String BASE_URL = "https://api.openai.com/v1";
public OpenAIAssistant(
ChatLanguageModel chatLanguageModel,
StreamingChatLanguageModel streamingChatLanguageModel,
ChatMemory chatMemory) {
- super(chatMemory);
- this.chatLanguageModel = chatLanguageModel;
- this.streamingChatLanguageModel = streamingChatLanguageModel;
- }
-
- @Override
- public Flux<String> streamAsk(String chatMessage) {
- chatMemory.add(UserMessage.from(chatMessage));
- return Flux.create(
- emitter ->
streamingChatLanguageModel.generate(chatMemory.messages(), new
StreamingResponseHandler<>() {
- @Override
- public void onNext(String token) {
- emitter.next(token);
- }
-
- @Override
- public void onError(Throwable error) {
- emitter.error(error);
- }
-
- @Override
- public void onComplete(Response<AiMessage> response) {
- StreamingResponseHandler.super.onComplete(response);
- chatMemory.add(response.content());
- }
- }),
- FluxSink.OverflowStrategy.BUFFER);
- }
-
- @Override
- public String ask(String chatMessage) {
- chatMemory.add(UserMessage.from(chatMessage));
- Response<AiMessage> generate =
chatLanguageModel.generate(chatMemory.messages());
- String aiMessage = generate.content().text();
- chatMemory.add(AiMessage.from(aiMessage));
- return aiMessage;
+ super(chatLanguageModel, streamingChatLanguageModel, chatMemory);
}
@Override
@@ -102,28 +57,32 @@ public class OpenAIAssistant extends AbstractAIAssistant {
public static class Builder extends AbstractAIAssistant.Builder {
- public AIAssistant build() {
+ @Override
+ public ChatLanguageModel getChatLanguageModel() {
String model =
ValidationUtils.ensureNotNull(configProvider.getModel(), "model");
String apiKey = ValidationUtils.ensureNotNull(
configProvider.getCredentials().get("apiKey"), "apiKey");
- ChatLanguageModel openAiChatModel = OpenAiChatModel.builder()
+ return OpenAiChatModel.builder()
.apiKey(apiKey)
.baseUrl(BASE_URL)
.modelName(model)
.build();
- StreamingChatLanguageModel openaiStreamChatModel =
OpenAiStreamingChatModel.builder()
+ }
+
+ @Override
+ public StreamingChatLanguageModel getStreamingChatLanguageModel() {
+ String model =
ValidationUtils.ensureNotNull(configProvider.getModel(), "model");
+ String apiKey = ValidationUtils.ensureNotNull(
+ configProvider.getCredentials().get("apiKey"), "apiKey");
+ return OpenAiStreamingChatModel.builder()
.apiKey(apiKey)
.baseUrl(BASE_URL)
.modelName(model)
.build();
- MessageWindowChatMemory.Builder builder =
MessageWindowChatMemory.builder()
- .chatMemoryStore(chatMemoryStore)
- .maxMessages(MEMORY_LEN);
- if (id != null) {
- builder.id(id);
- }
- MessageWindowChatMemory chatMemory = builder.build();
- return new OpenAIAssistant(openAiChatModel, openaiStreamChatModel,
chatMemory);
+ }
+
+ public AIAssistant build() {
+ return new OpenAIAssistant(getChatLanguageModel(),
getStreamingChatLanguageModel(), getChatMemory());
}
}
}
diff --git
a/bigtop-manager-ai/bigtop-manager-ai-qianfan/src/main/java/org/apache/bigtop/manager/ai/qianfan/QianFanAssistant.java
b/bigtop-manager-ai/bigtop-manager-ai-qianfan/src/main/java/org/apache/bigtop/manager/ai/qianfan/QianFanAssistant.java
index cfd802b6..6b385c25 100644
---
a/bigtop-manager-ai/bigtop-manager-ai-qianfan/src/main/java/org/apache/bigtop/manager/ai/qianfan/QianFanAssistant.java
+++
b/bigtop-manager-ai/bigtop-manager-ai-qianfan/src/main/java/org/apache/bigtop/manager/ai/qianfan/QianFanAssistant.java
@@ -22,35 +22,24 @@ import
org.apache.bigtop.manager.ai.core.AbstractAIAssistant;
import org.apache.bigtop.manager.ai.core.enums.PlatformType;
import org.apache.bigtop.manager.ai.core.factory.AIAssistant;
-import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.SystemMessage;
-import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.internal.ValidationUtils;
import dev.langchain4j.memory.ChatMemory;
-import dev.langchain4j.memory.chat.MessageWindowChatMemory;
-import dev.langchain4j.model.StreamingResponseHandler;
import dev.langchain4j.model.chat.ChatLanguageModel;
import dev.langchain4j.model.chat.StreamingChatLanguageModel;
-import dev.langchain4j.model.output.Response;
import dev.langchain4j.model.qianfan.QianfanChatModel;
import dev.langchain4j.model.qianfan.QianfanStreamingChatModel;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
public class QianFanAssistant extends AbstractAIAssistant {
- private final ChatLanguageModel chatLanguageModel;
- private final StreamingChatLanguageModel streamingChatLanguageModel;
private SystemMessage systemMessage;
public QianFanAssistant(
ChatLanguageModel chatLanguageModel,
StreamingChatLanguageModel streamingChatLanguageModel,
ChatMemory chatMemory) {
- super(chatMemory);
- this.chatLanguageModel = chatLanguageModel;
- this.streamingChatLanguageModel = streamingChatLanguageModel;
+ super(chatLanguageModel, streamingChatLanguageModel, chatMemory);
for (ChatMessage chatMessage : chatMemory.messages()) {
if (chatMessage instanceof SystemMessage) {
this.systemMessage = (SystemMessage) chatMessage;
@@ -58,39 +47,6 @@ public class QianFanAssistant extends AbstractAIAssistant {
}
}
- @Override
- public Flux<String> streamAsk(String chatMessage) {
- chatMemory.add(UserMessage.from(chatMessage));
- return Flux.create(
- emitter ->
streamingChatLanguageModel.generate(chatMemory.messages(), new
StreamingResponseHandler<>() {
- @Override
- public void onNext(String token) {
- emitter.next(token);
- }
-
- @Override
- public void onError(Throwable error) {
- emitter.error(error);
- }
-
- @Override
- public void onComplete(Response<AiMessage> response) {
- StreamingResponseHandler.super.onComplete(response);
- chatMemory.add(response.content());
- }
- }),
- FluxSink.OverflowStrategy.BUFFER);
- }
-
- @Override
- public String ask(String chatMessage) {
- chatMemory.add(UserMessage.from(chatMessage));
- Response<AiMessage> generate =
chatLanguageModel.generate(chatMemory.messages());
- String aiMessage = generate.content().text();
- chatMemory.add(AiMessage.from(aiMessage));
- return aiMessage;
- }
-
@Override
public void setSystemPrompt(String systemPrompt) {
// Multiple system messages are not supported
@@ -112,29 +68,35 @@ public class QianFanAssistant extends AbstractAIAssistant {
public static class Builder extends AbstractAIAssistant.Builder {
public AIAssistant build() {
+ return new QianFanAssistant(getChatLanguageModel(),
getStreamingChatLanguageModel(), getChatMemory());
+ }
+
+ @Override
+ public ChatLanguageModel getChatLanguageModel() {
String model =
ValidationUtils.ensureNotNull(configProvider.getModel(), "model");
String apiKey = ValidationUtils.ensureNotNull(
configProvider.getCredentials().get("apiKey"), "apiKey");
String secretKey = ValidationUtils.ensureNotNull(
configProvider.getCredentials().get("secretKey"),
"secretKey");
- ChatLanguageModel qianfanChatModel = QianfanChatModel.builder()
+ return QianfanChatModel.builder()
.apiKey(apiKey)
.secretKey(secretKey)
.modelName(model)
.build();
- StreamingChatLanguageModel qianfanStreamChatModel =
QianfanStreamingChatModel.builder()
+ }
+
+ @Override
+ public StreamingChatLanguageModel getStreamingChatLanguageModel() {
+ String model =
ValidationUtils.ensureNotNull(configProvider.getModel(), "model");
+ String apiKey = ValidationUtils.ensureNotNull(
+ configProvider.getCredentials().get("apiKey"), "apiKey");
+ String secretKey = ValidationUtils.ensureNotNull(
+ configProvider.getCredentials().get("secretKey"),
"secretKey");
+ return QianfanStreamingChatModel.builder()
.apiKey(apiKey)
.secretKey(secretKey)
.modelName(model)
.build();
- MessageWindowChatMemory.Builder builder =
MessageWindowChatMemory.builder()
- .chatMemoryStore(chatMemoryStore)
- .maxMessages(MEMORY_LEN);
- if (id != null) {
- builder.id(id);
- }
- MessageWindowChatMemory chatMemory = builder.build();
- return new QianFanAssistant(qianfanChatModel,
qianfanStreamChatModel, chatMemory);
}
}
}
diff --git a/bigtop-manager-bom/pom.xml b/bigtop-manager-bom/pom.xml
index 004e40e2..226f28c0 100644
--- a/bigtop-manager-bom/pom.xml
+++ b/bigtop-manager-bom/pom.xml
@@ -49,8 +49,7 @@
<micrometer.version>1.12.4</micrometer.version>
<jdbc.dm.version>8.1.2.192</jdbc.dm.version>
<sshd.version>2.14.0</sshd.version>
- <langchain4j.version>0.33.0</langchain4j.version>
- <dashscope.version>2.16.3</dashscope.version>
+ <langchain4j.version>0.35.0</langchain4j.version>
<mybatis-spring-boot-starter.version>3.0.3</mybatis-spring-boot-starter.version>
<pagehelper-spring-boot-starter.version>2.1.0</pagehelper-spring-boot-starter.version>
</properties>
@@ -258,13 +257,13 @@
<version>${langchain4j.version}</version>
</dependency>
<dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>dashscope-sdk-java</artifactId>
- <version>${dashscope.version}</version>
+ <groupId>dev.langchain4j</groupId>
+ <artifactId>langchain4j-qianfan</artifactId>
+ <version>${langchain4j.version}</version>
</dependency>
<dependency>
<groupId>dev.langchain4j</groupId>
- <artifactId>langchain4j-qianfan</artifactId>
+ <artifactId>langchain4j-dashscope</artifactId>
<version>${langchain4j.version}</version>
</dependency>
</dependencies>
diff --git
a/bigtop-manager-dao/src/main/java/org/apache/bigtop/manager/dao/po/ChatThreadPO.java
b/bigtop-manager-dao/src/main/java/org/apache/bigtop/manager/dao/po/ChatThreadPO.java
index d5f9eb10..619e5fd5 100644
---
a/bigtop-manager-dao/src/main/java/org/apache/bigtop/manager/dao/po/ChatThreadPO.java
+++
b/bigtop-manager-dao/src/main/java/org/apache/bigtop/manager/dao/po/ChatThreadPO.java
@@ -34,9 +34,6 @@ public class ChatThreadPO extends BasePO implements
Serializable {
@Column(name = "id")
private Long id;
- @Column(name = "thread_info")
- private String threadInfo;
-
@Column(name = "user_id", nullable = false)
private Long userId;
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/converter/ChatThreadConverter.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/converter/ChatThreadConverter.java
index d22b346d..59aa3fae 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/converter/ChatThreadConverter.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/converter/ChatThreadConverter.java
@@ -37,10 +37,8 @@ public interface ChatThreadConverter {
@Mapping(source = "id", target = "threadId")
ChatThreadVO fromPO2VO(ChatThreadPO platformAuthorizedPO);
- @Mapping(source = "threadInfo", target = "threadInfo", qualifiedByName =
"map2String")
ChatThreadPO fromDTO2PO(ChatThreadDTO chatThreadDTO);
- @Mapping(source = "threadInfo", target = "threadInfo", qualifiedByName =
"jsonString2Map")
ChatThreadDTO fromPO2DTO(ChatThreadPO chatThreadPO);
ChatThreadDTO fromReq2DTO(ChatbotThreadReq chatbotThreadReq);
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/dto/ChatThreadDTO.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/dto/ChatThreadDTO.java
index bd847b0e..60ab612a 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/dto/ChatThreadDTO.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/dto/ChatThreadDTO.java
@@ -33,6 +33,4 @@ public class ChatThreadDTO {
private String name;
private Map<String, String> authCredentials;
-
- private Map<String, String> threadInfo;
}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/ChatbotServiceImpl.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/ChatbotServiceImpl.java
index cfbb9d5f..9bd1e318 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/ChatbotServiceImpl.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/ChatbotServiceImpl.java
@@ -104,13 +104,11 @@ public class ChatbotServiceImpl implements ChatbotService
{
return null;
}
- private AIAssistantConfig getAIAssistantConfig(
- String model, Map<String, String> credentials, Map<String, String>
configs) {
+ private AIAssistantConfig getAIAssistantConfig(String model, Map<String,
String> credentials) {
return AIAssistantConfig.builder()
.setModel(model)
.setLanguage(LocaleContextHolder.getLocale().toString())
.addCredentials(credentials)
- .addConfigs(configs)
.build();
}
@@ -119,13 +117,9 @@ public class ChatbotServiceImpl implements ChatbotService {
}
private AIAssistant buildAIAssistant(
- String platformName,
- String model,
- Map<String, String> credentials,
- Long threadId,
- Map<String, String> configs) {
+ String platformName, String model, Map<String, String>
credentials, Long threadId) {
return getAIAssistantFactory()
- .create(getPlatformType(platformName),
getAIAssistantConfig(model, credentials, configs), threadId);
+ .create(getPlatformType(platformName),
getAIAssistantConfig(model, credentials), threadId);
}
@Override
@@ -141,10 +135,6 @@ public class ChatbotServiceImpl implements ChatbotService {
chatThreadDTO.setPlatformId(platformPO.getId());
chatThreadDTO.setAuthId(authPlatformPO.getId());
- AIAssistant aiAssistant = buildAIAssistant(
- platformPO.getName(), authPlatformDTO.getModel(),
authPlatformDTO.getAuthCredentials(), null, null);
- Map<String, String> threadInfo = aiAssistant.createThread();
- chatThreadDTO.setThreadInfo(threadInfo);
ChatThreadPO chatThreadPO =
ChatThreadConverter.INSTANCE.fromDTO2PO(chatThreadDTO);
chatThreadPO.setUserId(userId);
chatThreadDao.save(chatThreadPO);
@@ -213,8 +203,7 @@ public class ChatbotServiceImpl implements ChatbotService {
platformPO.getName(),
authPlatformDTO.getModel(),
authPlatformDTO.getAuthCredentials(),
- chatThreadPO.getId(),
- chatThreadDTO.getThreadInfo());
+ chatThreadPO.getId());
Flux<String> stringFlux = aiAssistant.streamAsk(message);
SseEmitter emitter = new SseEmitter();
diff --git a/bigtop-manager-server/src/main/resources/ddl/MySQL-DDL-CREATE.sql
b/bigtop-manager-server/src/main/resources/ddl/MySQL-DDL-CREATE.sql
index d89400ab..41918fdc 100644
--- a/bigtop-manager-server/src/main/resources/ddl/MySQL-DDL-CREATE.sql
+++ b/bigtop-manager-server/src/main/resources/ddl/MySQL-DDL-CREATE.sql
@@ -332,7 +332,6 @@ CREATE TABLE `llm_chat_thread`
`auth_id` BIGINT(20) UNSIGNED NOT NULL,
`user_id` BIGINT(20) UNSIGNED NOT NULL,
`is_deleted` TINYINT(1) DEFAULT 0 NULL,
- `thread_info` TEXT DEFAULT NULL,
`name` VARCHAR(255) DEFAULT NULL,
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
diff --git
a/bigtop-manager-server/src/main/resources/ddl/PostgreSQL-DDL-CREATE.sql
b/bigtop-manager-server/src/main/resources/ddl/PostgreSQL-DDL-CREATE.sql
index e59ca7c4..c977921f 100644
--- a/bigtop-manager-server/src/main/resources/ddl/PostgreSQL-DDL-CREATE.sql
+++ b/bigtop-manager-server/src/main/resources/ddl/PostgreSQL-DDL-CREATE.sql
@@ -344,7 +344,6 @@ CREATE TABLE llm_chat_thread
id BIGINT CHECK (id > 0) NOT NULL GENERATED ALWAYS AS
IDENTITY,
auth_id BIGINT CHECK (auth_id > 0) NOT NULL,
user_id BIGINT CHECK (user_id > 0) NOT NULL,
- thread_info TEXT DEFAULT NULL,
name VARCHAR(255) DEFAULT NULL,
is_deleted BOOLEAN DEFAULT FALSE,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,