This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch ds in repository https://gitbox.apache.org/repos/asf/streampark.git
commit a001dd9842026ae8a29f33dd8265053c3a1e1d8d Author: benjobs <[email protected]> AuthorDate: Mon Feb 17 16:48:47 2025 +0800 [Feat] flinksql improvement --- pom.xml | 61 ---------------------- .../streampark-console-service/pom.xml | 11 ++++ .../src/main/assembly/conf/config.yaml | 6 +++ .../console/base/config/DeepSeekConfig.java | 42 +++++++++++++++ .../streampark/console/core/bean/ChatRequest.java | 41 +++++++++++++++ .../streampark/console/core/bean/ChatResponse.java | 39 ++++++++++++++ .../console/core/controller/ChatController.java | 41 +++++++++++++++ .../console/core/service/DeepSeekService.java | 45 ++++++++++++++++ 8 files changed, 225 insertions(+), 61 deletions(-) diff --git a/pom.xml b/pom.xml index ca3fbaa2b..c8d4a22b3 100644 --- a/pom.xml +++ b/pom.xml @@ -742,62 +742,6 @@ </executions> </plugin> - <!--mvn apache-rat:check--> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <version>${maven-apache-rat-plugin.version}</version> - <configuration> - <excludes> - <exclude>.asf.yaml</exclude> - <exclude>.git-blame-ignore-revs</exclude> - <exclude>.editorconfig</exclude> - <exclude>.git/</exclude> - <exclude>.github/**</exclude> - <exclude>.gitignore</exclude> - <exclude>.licenserc.yaml</exclude> - <exclude>.scalafmt.conf</exclude> - - <exclude>**/.idea/</exclude> - <exclude>**/*.iml</exclude> - <exclude>**/*.txt</exclude> - <exclude>**/*.json</exclude> - <exclude>**/*.md</exclude> - <exclude>**/*.log</exclude> - <exclude>**/.gitkeep</exclude> - <exclude>**/.settings/*</exclude> - <exclude>**/.classpath</exclude> - <exclude>**/.project</exclude> - <exclude>**/target/**</exclude> - <exclude>**/out/**</exclude> - <exclude>**/META-INF/**</exclude> - - <exclude>.mvn/**</exclude> - <exclude>compiler/**</exclude> - <exclude>dist-material/**</exclude> - <exclude>docker/**</exclude> - <exclude>helm/**</exclude> - <exclude>mvnw</exclude> - <exclude>mvnw.cmd</exclude> - <exclude>README.md</exclude> - - <exclude>src/main/assembly/**</exclude> - <exclude>src/main/resources/alert-template/**</exclude> - <exclude>src/main/resources/*.dict</exclude> - - <exclude>streampark-console-webapp/**</exclude> - </excludes> - </configuration> - <executions> - <execution> - <id>rat-validate</id> - <goals> - <goal>check</goal> - </goals> - <phase>validate</phase> - </execution> - </executions> - </plugin> </plugins> </pluginManagement> @@ -831,11 +775,6 @@ <groupId>org.owasp</groupId> <artifactId>dependency-check-maven</artifactId> </plugin> - - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - </plugin> </plugins> </build> diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml index d41d96552..05161fc10 100644 --- a/streampark-console/streampark-console-service/pom.xml +++ b/streampark-console/streampark-console-service/pom.xml @@ -153,6 +153,17 @@ </exclusions> </dependency> + <!-- WebClient --> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-webflux</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-to-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- docker client--> <dependency> diff --git a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml index e51bb812f..028a1687f 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml +++ b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml @@ -32,6 +32,12 @@ server: io: 16 worker: 256 +ai: + deepseek: + api: + key: + base-url: + # system database, default h2, mysql|pgsql|h2 datasource: dialect: h2 #h2, mysql, pgsql diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/DeepSeekConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/DeepSeekConfig.java new file mode 100644 index 000000000..232f41c9b --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/DeepSeekConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.base.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.function.client.WebClient; + +@Configuration +public class DeepSeekConfig { + + @Value("${ai.deepseek.api.key}") + private String apiKey; + + @Value("${ai.deepseek.api.base-url}") + private String baseUrl; + + @Bean + public WebClient deepSeekWebClient() { + return WebClient.builder() + .baseUrl(baseUrl) + .defaultHeader("Content-Type", "application/json") + .defaultHeader("Authorization", "Bearer " + apiKey) + .build(); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ChatRequest.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ChatRequest.java new file mode 100644 index 000000000..c0c33bc06 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ChatRequest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ChatRequest { + private String model; + private List<Message> messages; + private double temperature; + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class Message { + private String role; + private String content; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ChatResponse.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ChatResponse.java new file mode 100644 index 000000000..0b50d2b33 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/ChatResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.Data; + +import java.util.List; + +@Data +public class ChatResponse { + private List<ChatChoice> choices; + + @Data + public static class ChatChoice { + private ChatMessage message; + private String finishReason; + } + + @Data + public static class ChatMessage { + private String role; + private String content; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ChatController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ChatController.java new file mode 100644 index 000000000..c7fc160e0 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ChatController.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.controller; + +import org.apache.streampark.console.core.service.DeepSeekService; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/chat") +public class ChatController { + + @Autowired private DeepSeekService deepSeekService; + + @PostMapping + public ResponseEntity<String> handleChat(@RequestBody String message) { + String response = deepSeekService.chat(message); + return ResponseEntity.ok(response); + } + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DeepSeekService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DeepSeekService.java new file mode 100644 index 000000000..ab968d9b4 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DeepSeekService.java @@ -0,0 +1,45 @@ +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.core.bean.ChatRequest; +import org.apache.streampark.console.core.bean.ChatResponse; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +import java.util.Collections; + +@Service +public class DeepSeekService { + + @Autowired private WebClient webClient; + + public String chat(String userMessage) { + ChatRequest request = + new ChatRequest( + "deepseek-reasoner", + Collections.singletonList(new ChatRequest.Message("user", userMessage)), + 0.7); + + ChatResponse response = + webClient + .post() + .uri("/v1/chat/completions") + .bodyValue(request) + .retrieve() + .onStatus( + status -> !status.is2xxSuccessful(), + clientResponse -> + clientResponse + .bodyToMono(String.class) + .flatMap(error -> Mono.error(new RuntimeException("API Error: " + error)))) + .bodyToMono(ChatResponse.class) + .block(); + + if (response != null) { + return response.getChoices().get(0).getMessage().getContent(); + } + return null; + } +}
