This is an automated email from the ASF dual-hosted git repository.
pandaapo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new fdb96d83e [ISSUE #4047] Support chatGPT source connector (#4817)
fdb96d83e is described below
commit fdb96d83e6960d594ae08c9459cb4c06bef5135b
Author: Jevin Jiang <[email protected]>
AuthorDate: Sun Apr 21 08:27:14 2024 +0800
[ISSUE #4047] Support chatGPT source connector (#4817)
* [ISSUE #4047] Support chatGPT source connector
* [ISSUE #4047] Add OpenAI configuration and adjust DTO
* [ISSUE #4047] Join parse request support
* [ISSUE #4047] impl Parse request
* [ISSUE #4047] fix code style
* [ISSUE #4047] fix code style
* [ISSUE #4047] fix dependencies check failed
* [ISSUE #4047] fix dependencies check
* [ISSUE #4047] fix license check
* [ISSUE #4047] fix review question
* [ISSUE #4047] fix review question
* [ISSUE #4047] add default value
* [ISSUE #4047] fix test
* [ISSUE #4047] default timeout value is zero , not timeout .
* [ISSUE #4047] fix review
* [ISSUE #4047] fix license check
---------
Co-authored-by: JiangShuJu <[email protected]>
---
.../eventmesh-connector-chatgpt/build.gradle | 28 +++
.../eventmesh-connector-chatgpt/gradle.properties | 18 ++
.../chatgpt/config/ChatGPTServerConfig.java | 32 +++
.../chatgpt/server/ChatGPTConnectServer.java | 40 ++++
.../chatgpt/source/config/ChatGPTSourceConfig.java | 35 +++
.../config/ChatGPTSourceConnectorConfig.java | 37 +++
.../chatgpt/source/config/OpenaiConfig.java | 42 ++++
.../chatgpt/source/config/OpenaiProxyConfig.java | 29 +++
.../source/connector/ChatGPTSourceConnector.java | 252 +++++++++++++++++++++
.../chatgpt/source/dto/ChatGPTRequestDTO.java | 61 +++++
.../chatgpt/source/enums/ChatGPTRequestType.java | 25 ++
.../chatgpt/source/handlers/ChatHandler.java | 69 ++++++
.../chatgpt/source/handlers/ParseHandler.java | 134 +++++++++++
.../chatgpt/source/managers/OpenaiManager.java | 131 +++++++++++
.../src/main/resources/prompt | 44 ++++
.../src/main/resources/server-config.yml | 19 ++
.../src/main/resources/source-config.yml | 51 +++++
.../connector/ChatGPTSourceConnectorTest.java | 166 ++++++++++++++
.../src/test/resources/server-config.yml | 19 ++
.../src/test/resources/source-config.yml | 52 +++++
settings.gradle | 1 +
tools/dependency-check/known-dependencies.txt | 12 +
22 files changed, 1297 insertions(+)
diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/build.gradle
b/eventmesh-connectors/eventmesh-connector-chatgpt/build.gradle
new file mode 100644
index 000000000..b2c45b620
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-chatgpt/build.gradle
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+dependencies {
+ api project(":eventmesh-openconnect:eventmesh-openconnect-java")
+ implementation project(":eventmesh-common")
+ implementation 'com.theokanning.openai-gpt3-java:service:0.18.2'
+ implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
+ implementation 'io.vertx:vertx-web:4.4.6'
+
+ testImplementation "org.apache.httpcomponents:httpclient"
+ compileOnly 'org.projectlombok:lombok'
+ annotationProcessor 'org.projectlombok:lombok'
+}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/gradle.properties
b/eventmesh-connectors/eventmesh-connector-chatgpt/gradle.properties
new file mode 100644
index 000000000..715bad3de
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-chatgpt/gradle.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+pluginType=connector
+pluginName=chatgpt
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/config/ChatGPTServerConfig.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/config/ChatGPTServerConfig.java
new file mode 100644
index 000000000..7d162920d
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/config/ChatGPTServerConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class ChatGPTServerConfig extends Config {
+
+ private boolean sourceEnable;
+
+ private boolean sinkEnable;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/server/ChatGPTConnectServer.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/server/ChatGPTConnectServer.java
new file mode 100644
index 000000000..ca104fe56
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/server/ChatGPTConnectServer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.server;
+
+import org.apache.eventmesh.connector.chatgpt.config.ChatGPTServerConfig;
+import
org.apache.eventmesh.connector.chatgpt.source.connector.ChatGPTSourceConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class ChatGPTConnectServer {
+
+ public static void main(String[] args) throws Exception {
+ ChatGPTServerConfig serverConfig =
ConfigUtil.parse(ChatGPTServerConfig.class, "server-config.yml");
+
+ if (serverConfig.isSourceEnable()) {
+ Application chatGPTSourceApp = new Application();
+ chatGPTSourceApp.run(ChatGPTSourceConnector.class);
+ }
+
+ if (serverConfig.isSinkEnable()) {
+ // TODO support sink connector
+ }
+ }
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java
new file mode 100644
index 000000000..959686691
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.config;
+
+import org.apache.eventmesh.openconnect.api.config.SourceConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class ChatGPTSourceConfig extends SourceConfig {
+
+ public ChatGPTSourceConnectorConfig connectorConfig;
+
+ public OpenaiProxyConfig openaiProxyConfig;
+
+ public OpenaiConfig openaiConfig;
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java
new file mode 100644
index 000000000..316fb5f24
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.config;
+
+import lombok.Data;
+
+@Data
+public class ChatGPTSourceConnectorConfig {
+
+ private String connectorName = "chatgptSource";
+
+ private String path = "/chatgpt";
+
+ private int port = 3756;
+
+ private int idleTimeout;
+
+ private boolean proxyEnable = false;
+
+ private String parsePromptFileName = "prompt";
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java
new file mode 100644
index 000000000..51858a709
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.config;
+
+
+import java.util.List;
+import java.util.Map;
+
+import lombok.Data;
+
+@Data
+public class OpenaiConfig {
+
+ private String token;
+ private String model = "gpt-3.5-turbo";
+ private long timeout;
+ private Double temperature;
+ private Integer maxTokens;
+ private Boolean logprob;
+ private Double topLogprobs;
+ private Map<String, Integer> logitBias;
+ private Double frequencyPenalty;
+ private Double presencePenalty;
+ private String user = "eventMesh";
+ private List<String> stop;
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java
new file mode 100644
index 000000000..14dd69f35
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.config;
+
+import lombok.Data;
+
+@Data
+public class OpenaiProxyConfig {
+
+ private String host;
+
+ private int port;
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
new file mode 100644
index 000000000..a947bc135
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.connector;
+
+import org.apache.eventmesh.common.ThreadPoolFactory;
+import org.apache.eventmesh.common.exception.EventMeshException;
+import
org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig;
+import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO;
+import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType;
+import org.apache.eventmesh.connector.chatgpt.source.handlers.ChatHandler;
+import org.apache.eventmesh.connector.chatgpt.source.handlers.ParseHandler;
+import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
+import org.apache.eventmesh.openconnect.api.source.Source;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.util.CloudEventUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.cloudevents.CloudEvent;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.RequestBody;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.BodyHandler;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ChatGPTSourceConnector implements Source {
+
+ private static final int DEFAULT_BATCH_SIZE = 10;
+
+ private ChatGPTSourceConfig sourceConfig;
+ private BlockingQueue<CloudEvent> queue;
+ private HttpServer server;
+ private final ExecutorService chatgptSourceExecutorService =
+
ThreadPoolFactory.createThreadPoolExecutor(Runtime.getRuntime().availableProcessors()
* 2, Runtime.getRuntime().availableProcessors() * 2,
+ "ChatGPTSourceThread");
+
+ private OpenaiManager openaiManager;
+ private String parsePromptTemplateStr;
+ private ChatHandler chatHandler;
+ private ParseHandler parseHandler;
+ private static final int DEFAULT_TIMEOUT = 0;
+
+ private static final String APPLICATION_JSON = "application/json";
+ private static final String TEXT_PLAIN = "text/plain";
+
+
+ @Override
+ public Class<? extends Config> configClass() {
+ return ChatGPTSourceConfig.class;
+ }
+
+ @Override
+ public void init(Config config) {
+ this.sourceConfig = (ChatGPTSourceConfig) config;
+ doInit();
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) {
+ SourceConnectorContext sourceConnectorContext =
(SourceConnectorContext) connectorContext;
+ this.sourceConfig = (ChatGPTSourceConfig)
sourceConnectorContext.getSourceConfig();
+ doInit();
+ }
+
+ public void initParsePrompt() {
+ String parsePromptFileName =
sourceConfig.getConnectorConfig().getParsePromptFileName();
+ URL resource =
Thread.currentThread().getContextClassLoader().getResource(parsePromptFileName);
+ if (resource == null) {
+ log.warn("cannot find prompt file {} in resources",
parsePromptFileName);
+ return;
+ }
+ String filePath = resource.getPath();
+ try (BufferedReader br = new BufferedReader(new FileReader(filePath)))
{
+ StringBuilder builder = new StringBuilder();
+ String line;
+ while ((line = br.readLine()) != null) {
+ if (!line.startsWith("#") && StringUtils.isNotBlank(line)) {
+ builder.append(line).append("\n");
+ }
+ }
+ this.parsePromptTemplateStr = builder.toString();
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to read file", e);
+ }
+ }
+
+
+ @SuppressWarnings("checkstyle:WhitespaceAround")
+ private void doInit() {
+ initParsePrompt();
+ this.openaiManager = new OpenaiManager(sourceConfig);
+ this.chatHandler = new ChatHandler(this.openaiManager);
+ if (StringUtils.isNotEmpty(parsePromptTemplateStr)) {
+ this.parseHandler = new ParseHandler(openaiManager,
parsePromptTemplateStr);
+ }
+ this.queue = new LinkedBlockingQueue<>(1024);
+ final Vertx vertx = Vertx.vertx();
+ final Router router = Router.router(vertx);
+
router.route().path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST).handler(BodyHandler.create()).handler(ctx
-> {
+ try {
+ RequestBody body = ctx.body();
+ ChatGPTRequestDTO bodyObject =
body.asPojo(ChatGPTRequestDTO.class);
+ validateRequestDTO(bodyObject);
+ handleRequest(bodyObject, ctx);
+ } catch (Exception e) {
+ handleError(e, ctx);
+ }
+ });
+ if (sourceConfig.connectorConfig.getIdleTimeout() < 0) {
+ log.warn("idleTimeout must be >= 0, your config value is {},
idleTimeout will be reset {}", sourceConfig.connectorConfig.getIdleTimeout(),
+ DEFAULT_TIMEOUT);
+ sourceConfig.connectorConfig.setIdleTimeout(DEFAULT_TIMEOUT);
+ }
+ this.server = vertx.createHttpServer(new
HttpServerOptions().setPort(this.sourceConfig.connectorConfig.getPort())
+
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router);
+ }
+
+
+ private void validateRequestDTO(ChatGPTRequestDTO bodyObject) {
+ if (StringUtils.isBlank(bodyObject.getText())) {
+ throw new IllegalArgumentException("Attributes 'text' cannot be
null");
+ }
+ }
+
+ private void handleRequest(ChatGPTRequestDTO bodyObject, RoutingContext
ctx) {
+ chatgptSourceExecutorService.execute(() -> {
+ try {
+ ChatGPTRequestType chatgptRequestType =
ChatGPTRequestType.valueOf(bodyObject.getRequestType());
+ CloudEvent cloudEvent = invokeHandler(chatgptRequestType,
bodyObject);
+ queue.add(cloudEvent);
+ log.info("[ChatGPTSourceConnector] Succeed to convert payload
into CloudEvent.");
+
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
+ } catch (IllegalArgumentException e) {
+ log.error("[ChatGPTSourceConnector] the request type is
illegal: {}", e.getMessage(), e);
+
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
+ .setStatusMessage(String.format("request type '%s' is not
supported", bodyObject.getRequestType())).end();
+ } catch (Exception e) {
+ log.error("[ChatGPTSourceConnector] Error processing request:
{}", e.getMessage(), e);
+
ctx.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
+ }
+ });
+ }
+
+ private CloudEvent invokeHandler(ChatGPTRequestType chatgptRequestType,
ChatGPTRequestDTO bodyObject) {
+ switch (chatgptRequestType) {
+ case CHAT:
+ if (StringUtils.isBlank(bodyObject.getDataContentType())) {
+ bodyObject.setDataContentType(TEXT_PLAIN);
+ }
+ return chatHandler.invoke(bodyObject);
+ case PARSE:
+ if (StringUtils.isBlank(parsePromptTemplateStr)) {
+ throw new IllegalStateException(
+ "the request type of PARSE must be configured with the
correct parsePromptFileName in source-config.yml");
+ }
+ if (StringUtils.isBlank(bodyObject.getFields())) {
+ throw new IllegalStateException("Attributes 'fields'
cannot be null in PARSE");
+ }
+ if (StringUtils.isBlank(bodyObject.getDataContentType())) {
+ bodyObject.setDataContentType(APPLICATION_JSON);
+ }
+ return parseHandler.invoke(bodyObject);
+ default:
+ throw new IllegalStateException("the request type is illegal");
+ }
+ }
+
+ private void handleError(Exception e, RoutingContext ctx) {
+ log.error("[ChatGPTSourceConnector] Malformed request.", e);
+
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
+ }
+
+ @Override
+ public void start() {
+ Throwable t = this.server.listen().cause();
+ if (t != null) {
+ throw new EventMeshException("failed to start Vertx server", t);
+ }
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return this.sourceConfig.getConnectorConfig().getConnectorName();
+ }
+
+ @Override
+ public void stop() {
+ Throwable t = this.server.close().cause();
+ if (t != null) {
+ throw new EventMeshException("failed to stop Vertx server", t);
+ }
+ }
+
+ @Override
+ public List<ConnectRecord> poll() {
+ List<ConnectRecord> connectRecords = new
ArrayList<>(DEFAULT_BATCH_SIZE);
+ for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
+ try {
+ CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+ if (event == null) {
+ break;
+ }
+ connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ return connectRecords;
+ }
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java
new file mode 100644
index 000000000..a203a24e5
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.dto;
+
+import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType;
+
+import java.time.ZonedDateTime;
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ChatGPTRequestDTO {
+
+ private String requestType = ChatGPTRequestType.CHAT.name();
+
+ private String source = "/";
+
+ private String subject = "chatGPT";
+
+ @JsonProperty("datacontenttype")
+ private String dataContentType;
+
+ private String type = "cloudevents";
+
+ private String text;
+
+ private String fields;
+
+ @JsonInclude
+ private String id = UUID.randomUUID().toString();
+
+ @JsonInclude
+ private String time = ZonedDateTime.now().toOffsetDateTime().toString();
+
+ public String getFields() {
+ return fields.replace(";", "\n");
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java
new file mode 100644
index 000000000..993052565
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.enums;
+
+
+public enum ChatGPTRequestType {
+
+ CHAT, PARSE;
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java
new file mode 100644
index 000000000..6d79a0559
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.handlers;
+
+
+import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO;
+import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager;
+
+import java.net.URI;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+import com.theokanning.openai.completion.chat.ChatCompletionRequest;
+import com.theokanning.openai.completion.chat.ChatMessage;
+import com.theokanning.openai.completion.chat.ChatMessageRole;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ChatHandler {
+
+ private final OpenaiManager openaiManager;
+
+ public ChatHandler(OpenaiManager openaiManager) {
+ this.openaiManager = openaiManager;
+ }
+
+ public CloudEvent invoke(ChatGPTRequestDTO event) {
+ return genGptConnectRecord(event);
+ }
+
+ private CloudEvent genGptConnectRecord(ChatGPTRequestDTO event) {
+ List<ChatMessage> chatMessages = new ArrayList<>();
+ chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(),
event.getText()));
+ ChatCompletionRequest req =
openaiManager.newChatCompletionRequest(chatMessages);
+ String chatResult = openaiManager.getResult(req);
+
+ return CloudEventBuilder.v1()
+ .withId(UUID.randomUUID().toString())
+ .withSource(URI.create(event.getSource()))
+ .withType(event.getType())
+ .withTime(ZonedDateTime.now().toOffsetDateTime())
+ .withData(chatResult.getBytes())
+ .withSubject(event.getSubject())
+ .withDataContentType(event.getDataContentType())
+ .build();
+ }
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java
new file mode 100644
index 000000000..aad3d384c
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.handlers;
+
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO;
+import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.text.StringSubstitutor;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.jackson.JsonFormat;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.theokanning.openai.completion.chat.ChatCompletionRequest;
+import com.theokanning.openai.completion.chat.ChatMessage;
+import com.theokanning.openai.completion.chat.ChatMessageRole;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ParseHandler {
+
+ private final OpenaiManager openaiManager;
+
+ private final String promptTemplate;
+
+ private static final JsonFormat jsonFormat = new JsonFormat(false, true);
+
+
+ public ParseHandler(OpenaiManager openaiManager, String promptTemplate) {
+ this.openaiManager = openaiManager;
+ this.promptTemplate = promptTemplate;
+ }
+
+ @SuppressWarnings("checkstyle:WhitespaceAfter")
+ public CloudEvent invoke(ChatGPTRequestDTO event) {
+ Map<String, String> map = convertToMap(event);
+
+ StringSubstitutor substitute = new StringSubstitutor(map);
+ String finalPrompt = substitute.replace(promptTemplate);
+ List<ChatMessage> chatMessages = new ArrayList<>();
+ chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(),
finalPrompt));
+ ChatCompletionRequest req =
openaiManager.newChatCompletionRequest(chatMessages);
+ String chatResult = openaiManager.getResult(req);
+ chatResult = StringUtils.removeFirst(chatResult, "```json");
+ chatResult = StringUtils.removeEnd(chatResult, "```");
+ CloudEvent cloudEvent;
+ try {
+ cloudEvent =
jsonFormat.deserialize(chatResult.getBytes(Constants.DEFAULT_CHARSET));
+ } catch (Exception e) {
+ throw new IllegalStateException("cloudEvent parse fail, please
check your parse prompt file content", e);
+ }
+ return cloudEvent;
+ }
+
+ public Map<String, String> convertToMap(Object obj) {
+ Map<String, String> map = new HashMap<>();
+ Class<?> clazz = obj.getClass();
+ Field[] fields = clazz.getDeclaredFields();
+ for (Field field : fields) {
+ if (field.isSynthetic()) {
+ continue;
+ }
+ if (Map.class.isAssignableFrom(field.getType()) ||
List.class.isAssignableFrom(field.getType())) {
+ continue;
+ }
+ try {
+ String key = field.getName();
+ if (field.isAnnotationPresent(JsonProperty.class)) {
+ JsonProperty annotation =
field.getAnnotation(JsonProperty.class);
+ key = annotation.value();
+ }
+ Method getter = getGetter(field, clazz);
+ map.put(key, String.valueOf(getter.invoke(obj)));
+ } catch (IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
+ throw new IllegalStateException("convert to Map is fail", e);
+ }
+ }
+
+ return map;
+ }
+
+ public Method getGetter(Field field, Class<?> clazz) throws
NoSuchMethodException {
+ boolean isBooleanField =
boolean.class.isAssignableFrom(field.getType()) ||
Boolean.class.isAssignableFrom(field.getType());
+ String handledFieldName = upperFirst(field.getName());
+ String methodName;
+ if (isBooleanField) {
+ methodName = "is" + handledFieldName;
+ } else {
+ methodName = "get" + handledFieldName;
+ }
+ return clazz.getDeclaredMethod(methodName);
+ }
+
+ public String upperFirst(String str) {
+ if (null == str) {
+ return null;
+ }
+ if (!str.isEmpty()) {
+ char firstChar = str.charAt(0);
+ if (Character.isLowerCase(firstChar)) {
+ return Character.toUpperCase(firstChar) +
StringUtils.substring(str, 1);
+ }
+ }
+ return str;
+ }
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java
new file mode 100644
index 000000000..fda5216bb
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.managers;
+
+import static com.theokanning.openai.service.OpenAiService.defaultClient;
+import static com.theokanning.openai.service.OpenAiService.defaultObjectMapper;
+import static com.theokanning.openai.service.OpenAiService.defaultRetrofit;
+
+import org.apache.eventmesh.common.utils.AssertUtils;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import
org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig;
+import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiConfig;
+import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiProxyConfig;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.time.Duration;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.theokanning.openai.client.OpenAiApi;
+import com.theokanning.openai.completion.chat.ChatCompletionRequest;
+import
com.theokanning.openai.completion.chat.ChatCompletionRequest.ChatCompletionRequestBuilder;
+import com.theokanning.openai.completion.chat.ChatMessage;
+import com.theokanning.openai.service.OpenAiService;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import okhttp3.OkHttpClient;
+import retrofit2.Retrofit;
+
+
+@Slf4j
+public class OpenaiManager {
+
+ @Getter
+ private OpenAiService openAiService;
+
+ private String chatCompletionRequestTemplateStr;
+
+ private static final int DEFAULT_TIMEOUT = 0;
+
+ public OpenaiManager(ChatGPTSourceConfig sourceConfig) {
+ initOpenAi(sourceConfig);
+ }
+
+ public String getResult(ChatCompletionRequest req) {
+ StringBuilder gptData = new StringBuilder();
+ try {
+ openAiService.createChatCompletion(req).getChoices()
+ .forEach(chatCompletionChoice ->
gptData.append(chatCompletionChoice.getMessage().getContent()));
+ } catch (Exception e) {
+ log.error("Failed to generate GPT connection record: {}",
e.getMessage());
+ }
+ return gptData.toString();
+ }
+
+ public ChatCompletionRequest newChatCompletionRequest(List<ChatMessage>
chatMessages) {
+ ChatCompletionRequest request =
JsonUtils.parseObject(chatCompletionRequestTemplateStr,
ChatCompletionRequest.class);
+ request.setMessages(chatMessages);
+ return request;
+ }
+
+ private void initOpenAi(ChatGPTSourceConfig sourceConfig) {
+ OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig();
+ if (openaiConfig.getTimeout() < 0) {
+ log.warn("openaiTimeout must be >= 0, your config value is {},
openaiTimeout will be reset {}", openaiConfig.getTimeout(),
+ DEFAULT_TIMEOUT);
+ openaiConfig.setTimeout(DEFAULT_TIMEOUT);
+ }
+ boolean proxyEnable = sourceConfig.connectorConfig.isProxyEnable();
+ if (proxyEnable) {
+ OpenaiProxyConfig chatgptProxyConfig =
sourceConfig.openaiProxyConfig;
+ if (chatgptProxyConfig.getHost() == null) {
+ throw new IllegalStateException("chatgpt proxy config 'host'
cannot be null");
+ }
+ ObjectMapper mapper = defaultObjectMapper();
+ Proxy proxy = new Proxy(Proxy.Type.HTTP, new
InetSocketAddress(chatgptProxyConfig.getHost(), chatgptProxyConfig.getPort()));
+ OkHttpClient client =
+ defaultClient(openaiConfig.getToken(),
Duration.ofSeconds(openaiConfig.getTimeout())).newBuilder().proxy(proxy).build();
+ Retrofit retrofit = defaultRetrofit(client, mapper);
+ OpenAiApi api = retrofit.create(OpenAiApi.class);
+ this.openAiService = new OpenAiService(api);
+ } else {
+ this.openAiService = new OpenAiService(openaiConfig.getToken(),
Duration.ofSeconds(openaiConfig.getTimeout()));
+ }
+ ChatCompletionRequestBuilder builder =
ChatCompletionRequest.builder().model(openaiConfig.getModel());
+ AssertUtils.notNull(openaiConfig.getModel(), "model cannot be null");
+ builder = builder.model(openaiConfig.getModel());
+ if (openaiConfig.getUser() != null) {
+ builder = builder.user(openaiConfig.getUser());
+ }
+ if (openaiConfig.getPresencePenalty() != null) {
+ builder =
builder.presencePenalty(openaiConfig.getPresencePenalty());
+ }
+ if (openaiConfig.getFrequencyPenalty() != null) {
+ builder =
builder.frequencyPenalty(openaiConfig.getFrequencyPenalty());
+ }
+ if (openaiConfig.getMaxTokens() != null) {
+ builder = builder.maxTokens(openaiConfig.getMaxTokens());
+ }
+ if (openaiConfig.getTemperature() != null) {
+ builder = builder.temperature(openaiConfig.getTemperature());
+ }
+ if (openaiConfig.getLogitBias() != null &&
!openaiConfig.getLogitBias().isEmpty()) {
+ builder = builder.logitBias(openaiConfig.getLogitBias());
+ }
+ if (openaiConfig.getStop() != null &&
!openaiConfig.getStop().isEmpty()) {
+ builder = builder.stop(openaiConfig.getStop());
+ }
+ this.chatCompletionRequestTemplateStr =
JsonUtils.toJSONString(builder.build());
+ }
+
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt
new file mode 100644
index 000000000..e10ecc331
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+You are an AI assistant named CloudEventsConverter. avoid escape characters .
+Your task is to construct a JSON object in CloudEvents format. Based on the
field name and field description in the 'data' field of the CloudEvents
formatted JSON object, convert the input text provided by the user into the
content of the 'data' field, which must comply with the specifications of the
content of the 'datacontenttype' field.
+The role is :
+ - If the 'datacontenttype' field content is 'application/json', then the'
data 'field content should be a JSON object,
+ - else If the 'datacontenttype' field content is not 'application/json' and
is 'application/xml', then the' data 'field content should be a string in XML
format and the outermost of XML format is <data> </data>, inside is the XML
generated by you based on field info;
+ - else the 'datacontenttype' field content is not 'application/json' and
'application/xml', then the' data 'field content is string of the 'text' field
content;
+Except for the content of the data field, all other values should be set to
and cannot be modified. Finally, return to me the JSON object in CloudEvents
format that you constructed
+
+The following text is the field name and field description in the 'data' field
of the CloudEvents-formatted JSON object, extract the following information:
+<BEGIN FIELD INFO>
+${fields}
+<END FIELD INFO>
+
+text: ${text}
+
+The output should be a markdown code snippet formatted in the following
schema, including the leading and trailing "```json" and "```":
+```json
+{
+ "specversion": string, Set to "1.0"
+ "type": string, Set to ${type}
+ "source": string, Set to ${source}
+ "subject": string, Set to ${subject}
+ "id": string, Set to ${id}
+ "time": string, Set to ${time}
+ "datacontenttype": string, Set to ${datacontenttype}
+ "data": object or string
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/server-config.yml
new file mode 100644
index 000000000..0cd7b5b5a
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml
new file mode 100644
index 000000000..b194e99ec
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TopicTest
+ idc: FT
+ env: PRD
+ group: chatgptSource
+ appId: 5032
+ userName: chatgptSourceUser
+ passWord: chatgptPassWord
+connectorConfig:
+ connectorName: chatgptSource
+ path: /chatgpt
+ port: 3756
+ idleTimeout: 0
+ proxyEnable: false
+ parsePromptFileName: prompt
+
+# https://platform.openai.com/docs/api-reference/chat/create
+openaiConfig:
+ token:
+ model: gpt-3.5-turbo
+ timeout: 0
+ temperature: 1
+ maxTokens:
+ frequencyPenalty: 0
+ presencePenalty: 0
+ user: eventMesh
+ stop: []
+ logitBias: {}
+
+openaiProxyConfig:
+ host: 127.0.0.1
+ port: 7890
+
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java
new file mode 100644
index 000000000..8347fdcbb
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.connector;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import
org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig;
+import
org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConnectorConfig;
+import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class ChatGPTSourceConnectorTest {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger("ChatGPTSourceConnectorTest");
+
+ private ChatGPTSourceConnector connector;
+ private ChatGPTSourceConnectorConfig config;
+ private CloseableHttpClient httpClient;
+ private String uri;
+ private final String expectedMessage = "Hello, can you tell me a story.";
+
+ private final String expectedParseMessage = "User 13356288979 from Tianjin
store placed an order with order number 11221122";
+
+
+ public boolean checkOpenAi() throws Exception {
+ ChatGPTSourceConfig sourceConfig = (ChatGPTSourceConfig)
ConfigUtil.parse(connector.configClass());
+ OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig();
+ if (StringUtils.isBlank(openaiConfig.getToken())) {
+ return false;
+ }
+ return true;
+ }
+
+ @BeforeEach
+ void setUp() throws Exception {
+ connector = new ChatGPTSourceConnector();
+ if (!checkOpenAi()) {
+ LOGGER.error("please set openai token in the config");
+ return;
+ }
+ ChatGPTSourceConfig sourceConfig = (ChatGPTSourceConfig)
ConfigUtil.parse(connector.configClass());
+ config = sourceConfig.getConnectorConfig();
+ connector.init(sourceConfig);
+ connector.start();
+
+ uri = new
URIBuilder().setScheme("http").setHost("127.0.0.1").setPort(config.getPort()).setPath(config.getPath()).build().toString();
+
+ httpClient = HttpClients.createDefault();
+ }
+
+ @Test
+ void testPoll() throws Exception {
+ ChatGPTSourceConfig sourceConfig = (ChatGPTSourceConfig)
ConfigUtil.parse(connector.configClass());
+ OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig();
+ if (StringUtils.isBlank(openaiConfig.getToken())) {
+ LOGGER.error("please set openai token in the config");
+ return;
+ }
+
+ final int batchSize = 10;
+
+ for (int i = 0; i < batchSize; i++) {
+ HttpResponse resp = mockStructuredChatRequest();
+ Assertions.assertEquals(resp.getStatusLine().getStatusCode(),
HttpStatus.SC_OK);
+ }
+
+ List<ConnectRecord> res = connector.poll();
+ Assertions.assertEquals(batchSize, res.size());
+
+ for (int i = 0; i < batchSize; i++) {
+ HttpResponse resp = mockStructuredParseRequest();
+ Assertions.assertEquals(resp.getStatusLine().getStatusCode(),
HttpStatus.SC_OK);
+ }
+
+ List<ConnectRecord> res1 = connector.poll();
+ Assertions.assertEquals(batchSize, res1.size());
+ }
+
+
+ HttpResponse mockStructuredChatRequest() throws Exception {
+ TestEvent event = new TestEvent();
+ event.type = "com.example.someevent";
+ event.source = "/mycontext";
+ event.subject = "test";
+ event.datacontenttype = "text/plain";
+ event.text = expectedMessage;
+ event.requestType = "CHAT";
+ HttpPost httpPost = new HttpPost(uri);
+ httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event)));
+
+ return httpClient.execute(httpPost);
+ }
+
+
+ HttpResponse mockStructuredParseRequest() throws Exception {
+ TestEvent event = new TestEvent();
+ event.type = "com.example.someevent";
+ event.source = "/mycontext";
+ event.subject = "test";
+ event.datacontenttype = "application/json";
+ event.text = expectedParseMessage;
+ event.requestType = "PARSE";
+ event.fields = "orderNo:this is order number;address:this is a
address;phone:this is phone number";
+ HttpPost httpPost = new HttpPost(uri);
+ httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event)));
+ return httpClient.execute(httpPost);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (!checkOpenAi()) {
+ return;
+ }
+ if (connector != null) {
+ connector.stop();
+ }
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ }
+
+ class TestEvent {
+
+ public String requestType;
+ public String type;
+ public String source;
+ public String subject;
+ public String datacontenttype;
+ public String text;
+ public String fields;
+ }
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/server-config.yml
new file mode 100644
index 000000000..0cd7b5b5a
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml
new file mode 100644
index 000000000..47f25edbb
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TopicTest
+ idc: FT
+ env: PRD
+ group: chatgptSource
+ appId: 5032
+ userName: chatgptSourceUser
+ passWord: chatgptPassWord
+connectorConfig:
+ connectorName: chatgptSource
+ path: /chatgpt
+ port: 3756
+ idleTimeout: 0
+ proxyEnable: true
+ parsePromptFileName: prompt
+
+# https://platform.openai.com/docs/api-reference/chat/create
+openaiConfig:
+ token:
+ model: gpt-3.5-turbo
+ timeout: 0
+ temperature: 1
+ maxTokens:
+ frequencyPenalty: 0
+ presencePenalty: 0
+ user: eventMesh
+ stop: []
+ logitBias: {
+
+ }
+
+openaiProxyConfig:
+ host: 127.0.0.1
+ port: 7890
diff --git a/settings.gradle b/settings.gradle
index 645e6fb36..6a8e27bf9 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -77,6 +77,7 @@ include 'eventmesh-connectors:eventmesh-connector-wecom'
include 'eventmesh-connectors:eventmesh-connector-slack'
include 'eventmesh-connectors:eventmesh-connector-wechat'
include 'eventmesh-connectors:eventmesh-connector-http'
+include 'eventmesh-connectors:eventmesh-connector-chatgpt'
include 'eventmesh-storage-plugin:eventmesh-storage-api'
include 'eventmesh-storage-plugin:eventmesh-storage-standalone'
diff --git a/tools/dependency-check/known-dependencies.txt
b/tools/dependency-check/known-dependencies.txt
index 05c99f3d3..73feb5fd3 100644
--- a/tools/dependency-check/known-dependencies.txt
+++ b/tools/dependency-check/known-dependencies.txt
@@ -1,6 +1,7 @@
FastInfoset-1.2.15.jar
ST4-4.3.4.jar
accessors-smart-2.4.7.jar
+adapter-rxjava2-2.9.0.jar
alibabacloud-gateway-spi-0.0.1.jar
amqp-client-5.16.0.jar
animal-sniffer-annotations-1.19.jar
@@ -12,6 +13,7 @@ antlr4-4.13.0.jar
antlr4-runtime-4.13.0.jar
aopalliance-1.0.jar
apache-client-2.20.29.jar
+api-0.18.2.jar
arns-2.20.29.jar
asm-9.1.jar
asm-9.2.jar
@@ -43,7 +45,9 @@ byte-buddy-1.11.0.jar
byte-buddy-1.12.18.jar
cache-api-1.1.1.jar
checker-qual-3.12.0.jar
+classgraph-4.8.21.jar
classmate-1.5.1.jar
+client-0.18.2.jar
cloudevents-api-2.4.2.jar
cloudevents-core-2.4.2.jar
cloudevents-http-vertx-2.3.0.jar
@@ -63,6 +67,7 @@ commons-logging-1.2.jar
commons-text-1.9.jar
commons-validator-1.7.jar
consul-api-1.4.5.jar
+converter-jackson-2.9.0.jar
credentials-java-0.2.4.jar
crt-core-2.20.29.jar
curator-client-5.4.0.jar
@@ -155,6 +160,7 @@ json-path-2.7.0.jar
json-smart-2.4.7.jar
json-utils-2.20.29.jar
jsr305-3.0.2.jar
+jtokkit-0.5.1.jar
kafka-clients-3.0.0.jar
listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
log4j-api-2.22.1.jar
@@ -162,6 +168,7 @@ log4j-core-2.22.1.jar
log4j-slf4j2-impl-2.22.1.jar
lz4-java-1.7.1.jar
lz4-java-1.8.0.jar
+mbknor-jackson-jsonschema_2.12-1.0.34.jar
metrics-annotation-4.1.0.jar
metrics-core-4.1.0.jar
metrics-healthchecks-4.1.0.jar
@@ -306,6 +313,7 @@ reactor-core-3.4.13.jar
redisson-3.17.3.jar
regions-2.20.29.jar
relaxngDatatype-20020414.jar
+retrofit-2.9.0.jar
rocketmq-acl-4.9.5.jar
rocketmq-broker-4.9.5.jar
rocketmq-client-4.9.5.jar
@@ -317,9 +325,12 @@ rocketmq-remoting-4.9.5.jar
rocketmq-srvutil-4.9.5.jar
rocketmq-store-4.9.5.jar
rocketmq-tools-4.9.5.jar
+rxjava-2.0.0.jar
rxjava-3.0.12.jar
s3-2.20.29.jar
+scala-library-2.12.8.jar
sdk-core-2.20.29.jar
+service-0.18.2.jar
simpleclient-0.12.0.jar
simpleclient_tracer_common-0.12.0.jar
simpleclient_tracer_otel-0.12.0.jar
@@ -352,6 +363,7 @@ tomcat-embed-el-9.0.56.jar
txw2-2.3.1.jar
utils-2.20.29.jar
validation-api-1.1.0.Final.jar
+validation-api-2.0.1.Final.jar
vertx-auth-common-4.4.6.jar
vertx-bridge-common-4.4.6.jar
vertx-core-4.4.6.jar
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]