This is an automated email from the ASF dual-hosted git repository.

pandaapo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new fdb96d83e [ISSUE #4047] Support chatGPT source connector (#4817)
fdb96d83e is described below

commit fdb96d83e6960d594ae08c9459cb4c06bef5135b
Author: Jevin Jiang <[email protected]>
AuthorDate: Sun Apr 21 08:27:14 2024 +0800

    [ISSUE #4047] Support chatGPT source connector (#4817)
    
    * [ISSUE #4047] Support chatGPT source connector
    
    * [ISSUE #4047] Add OpenAI configuration and adjust DTO
    
    * [ISSUE #4047] Join parse request support
    
    * [ISSUE #4047] impl Parse request
    
    * [ISSUE #4047] fix code style
    
    * [ISSUE #4047] fix code style
    
    * [ISSUE #4047] fix dependencies check failed
    
    * [ISSUE #4047] fix dependencies check
    
    * [ISSUE #4047] fix license check
    
    * [ISSUE #4047] fix review question
    
    * [ISSUE #4047] fix review question
    
    * [ISSUE #4047] add default value
    
    * [ISSUE #4047] fix test
    
    * [ISSUE #4047] default timeout value is zero , not timeout .
    
    * [ISSUE #4047] fix review
    
    * [ISSUE #4047] fix license check
    
    ---------
    
    Co-authored-by: JiangShuJu <[email protected]>
---
 .../eventmesh-connector-chatgpt/build.gradle       |  28 +++
 .../eventmesh-connector-chatgpt/gradle.properties  |  18 ++
 .../chatgpt/config/ChatGPTServerConfig.java        |  32 +++
 .../chatgpt/server/ChatGPTConnectServer.java       |  40 ++++
 .../chatgpt/source/config/ChatGPTSourceConfig.java |  35 +++
 .../config/ChatGPTSourceConnectorConfig.java       |  37 +++
 .../chatgpt/source/config/OpenaiConfig.java        |  42 ++++
 .../chatgpt/source/config/OpenaiProxyConfig.java   |  29 +++
 .../source/connector/ChatGPTSourceConnector.java   | 252 +++++++++++++++++++++
 .../chatgpt/source/dto/ChatGPTRequestDTO.java      |  61 +++++
 .../chatgpt/source/enums/ChatGPTRequestType.java   |  25 ++
 .../chatgpt/source/handlers/ChatHandler.java       |  69 ++++++
 .../chatgpt/source/handlers/ParseHandler.java      | 134 +++++++++++
 .../chatgpt/source/managers/OpenaiManager.java     | 131 +++++++++++
 .../src/main/resources/prompt                      |  44 ++++
 .../src/main/resources/server-config.yml           |  19 ++
 .../src/main/resources/source-config.yml           |  51 +++++
 .../connector/ChatGPTSourceConnectorTest.java      | 166 ++++++++++++++
 .../src/test/resources/server-config.yml           |  19 ++
 .../src/test/resources/source-config.yml           |  52 +++++
 settings.gradle                                    |   1 +
 tools/dependency-check/known-dependencies.txt      |  12 +
 22 files changed, 1297 insertions(+)

diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/build.gradle 
b/eventmesh-connectors/eventmesh-connector-chatgpt/build.gradle
new file mode 100644
index 000000000..b2c45b620
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-chatgpt/build.gradle
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+dependencies {
+    api project(":eventmesh-openconnect:eventmesh-openconnect-java")
+    implementation project(":eventmesh-common")
+    implementation 'com.theokanning.openai-gpt3-java:service:0.18.2'
+    implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
+    implementation 'io.vertx:vertx-web:4.4.6'
+
+    testImplementation "org.apache.httpcomponents:httpclient"
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
+}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/gradle.properties 
b/eventmesh-connectors/eventmesh-connector-chatgpt/gradle.properties
new file mode 100644
index 000000000..715bad3de
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-chatgpt/gradle.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+pluginType=connector
+pluginName=chatgpt
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/config/ChatGPTServerConfig.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/config/ChatGPTServerConfig.java
new file mode 100644
index 000000000..7d162920d
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/config/ChatGPTServerConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class ChatGPTServerConfig extends Config {
+
+    private boolean sourceEnable;
+
+    private boolean sinkEnable;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/server/ChatGPTConnectServer.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/server/ChatGPTConnectServer.java
new file mode 100644
index 000000000..ca104fe56
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/server/ChatGPTConnectServer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.server;
+
+import org.apache.eventmesh.connector.chatgpt.config.ChatGPTServerConfig;
+import 
org.apache.eventmesh.connector.chatgpt.source.connector.ChatGPTSourceConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class ChatGPTConnectServer {
+
+    public static void main(String[] args) throws Exception {
+        ChatGPTServerConfig serverConfig = 
ConfigUtil.parse(ChatGPTServerConfig.class, "server-config.yml");
+
+        if (serverConfig.isSourceEnable()) {
+            Application chatGPTSourceApp = new Application();
+            chatGPTSourceApp.run(ChatGPTSourceConnector.class);
+        }
+
+        if (serverConfig.isSinkEnable()) {
+            // TODO support sink connector
+        }
+    }
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java
new file mode 100644
index 000000000..959686691
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.config;
+
+import org.apache.eventmesh.openconnect.api.config.SourceConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class ChatGPTSourceConfig extends SourceConfig {
+
+    public ChatGPTSourceConnectorConfig connectorConfig;
+
+    public OpenaiProxyConfig openaiProxyConfig;
+
+    public OpenaiConfig openaiConfig;
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java
new file mode 100644
index 000000000..316fb5f24
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/ChatGPTSourceConnectorConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.config;
+
+import lombok.Data;
+
+@Data
+public class ChatGPTSourceConnectorConfig {
+
+    private String connectorName = "chatgptSource";
+
+    private String path = "/chatgpt";
+
+    private int port = 3756;
+
+    private int idleTimeout;
+
+    private boolean proxyEnable = false;
+
+    private String parsePromptFileName = "prompt";
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java
new file mode 100644
index 000000000..51858a709
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.config;
+
+
+import java.util.List;
+import java.util.Map;
+
+import lombok.Data;
+
+@Data
+public class OpenaiConfig {
+
+    private String token;
+    private String model = "gpt-3.5-turbo";
+    private long timeout;
+    private Double temperature;
+    private Integer maxTokens;
+    private Boolean logprob;
+    private Double topLogprobs;
+    private Map<String, Integer> logitBias;
+    private Double frequencyPenalty;
+    private Double presencePenalty;
+    private String user = "eventMesh";
+    private List<String> stop;
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java
new file mode 100644
index 000000000..14dd69f35
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/config/OpenaiProxyConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.config;
+
+import lombok.Data;
+
+@Data
+public class OpenaiProxyConfig {
+
+    private String host;
+
+    private int port;
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
new file mode 100644
index 000000000..a947bc135
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.connector;
+
+import org.apache.eventmesh.common.ThreadPoolFactory;
+import org.apache.eventmesh.common.exception.EventMeshException;
+import 
org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig;
+import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO;
+import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType;
+import org.apache.eventmesh.connector.chatgpt.source.handlers.ChatHandler;
+import org.apache.eventmesh.connector.chatgpt.source.handlers.ParseHandler;
+import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
+import org.apache.eventmesh.openconnect.api.source.Source;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.util.CloudEventUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.cloudevents.CloudEvent;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.RequestBody;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.BodyHandler;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ChatGPTSourceConnector implements Source {
+
+    private static final int DEFAULT_BATCH_SIZE = 10;
+
+    private ChatGPTSourceConfig sourceConfig;
+    private BlockingQueue<CloudEvent> queue;
+    private HttpServer server;
+    private final ExecutorService chatgptSourceExecutorService =
+        
ThreadPoolFactory.createThreadPoolExecutor(Runtime.getRuntime().availableProcessors()
 * 2, Runtime.getRuntime().availableProcessors() * 2,
+            "ChatGPTSourceThread");
+
+    private OpenaiManager openaiManager;
+    private String parsePromptTemplateStr;
+    private ChatHandler chatHandler;
+    private ParseHandler parseHandler;
+    private static final int DEFAULT_TIMEOUT = 0;
+
+    private static final String APPLICATION_JSON = "application/json";
+    private static final String TEXT_PLAIN = "text/plain";
+
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return ChatGPTSourceConfig.class;
+    }
+
+    @Override
+    public void init(Config config) {
+        this.sourceConfig = (ChatGPTSourceConfig) config;
+        doInit();
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) {
+        SourceConnectorContext sourceConnectorContext = 
(SourceConnectorContext) connectorContext;
+        this.sourceConfig = (ChatGPTSourceConfig) 
sourceConnectorContext.getSourceConfig();
+        doInit();
+    }
+
+    public void initParsePrompt() {
+        String parsePromptFileName = 
sourceConfig.getConnectorConfig().getParsePromptFileName();
+        URL resource = 
Thread.currentThread().getContextClassLoader().getResource(parsePromptFileName);
+        if (resource == null) {
+            log.warn("cannot find prompt file {} in resources", 
parsePromptFileName);
+            return;
+        }
+        String filePath = resource.getPath();
+        try (BufferedReader br = new BufferedReader(new FileReader(filePath))) 
{
+            StringBuilder builder = new StringBuilder();
+            String line;
+            while ((line = br.readLine()) != null) {
+                if (!line.startsWith("#") && StringUtils.isNotBlank(line)) {
+                    builder.append(line).append("\n");
+                }
+            }
+            this.parsePromptTemplateStr = builder.toString();
+        } catch (IOException e) {
+            throw new IllegalStateException("Unable to read file", e);
+        }
+    }
+
+
+    @SuppressWarnings("checkstyle:WhitespaceAround")
+    private void doInit() {
+        initParsePrompt();
+        this.openaiManager = new OpenaiManager(sourceConfig);
+        this.chatHandler = new ChatHandler(this.openaiManager);
+        if (StringUtils.isNotEmpty(parsePromptTemplateStr)) {
+            this.parseHandler = new ParseHandler(openaiManager, 
parsePromptTemplateStr);
+        }
+        this.queue = new LinkedBlockingQueue<>(1024);
+        final Vertx vertx = Vertx.vertx();
+        final Router router = Router.router(vertx);
+        
router.route().path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST).handler(BodyHandler.create()).handler(ctx
 -> {
+            try {
+                RequestBody body = ctx.body();
+                ChatGPTRequestDTO bodyObject = 
body.asPojo(ChatGPTRequestDTO.class);
+                validateRequestDTO(bodyObject);
+                handleRequest(bodyObject, ctx);
+            } catch (Exception e) {
+                handleError(e, ctx);
+            }
+        });
+        if (sourceConfig.connectorConfig.getIdleTimeout() < 0) {
+            log.warn("idleTimeout must be >= 0, your config value is {}, 
idleTimeout will be reset {}", sourceConfig.connectorConfig.getIdleTimeout(),
+                DEFAULT_TIMEOUT);
+            sourceConfig.connectorConfig.setIdleTimeout(DEFAULT_TIMEOUT);
+        }
+        this.server = vertx.createHttpServer(new 
HttpServerOptions().setPort(this.sourceConfig.connectorConfig.getPort())
+            
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router);
+    }
+
+
+    private void validateRequestDTO(ChatGPTRequestDTO bodyObject) {
+        if (StringUtils.isBlank(bodyObject.getText())) {
+            throw new IllegalArgumentException("Attributes 'text' cannot be 
null");
+        }
+    }
+
+    private void handleRequest(ChatGPTRequestDTO bodyObject, RoutingContext 
ctx) {
+        chatgptSourceExecutorService.execute(() -> {
+            try {
+                ChatGPTRequestType chatgptRequestType = 
ChatGPTRequestType.valueOf(bodyObject.getRequestType());
+                CloudEvent cloudEvent = invokeHandler(chatgptRequestType, 
bodyObject);
+                queue.add(cloudEvent);
+                log.info("[ChatGPTSourceConnector] Succeed to convert payload 
into CloudEvent.");
+                
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
+            } catch (IllegalArgumentException e) {
+                log.error("[ChatGPTSourceConnector] the request type is 
illegal: {}", e.getMessage(), e);
+                
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
+                    .setStatusMessage(String.format("request type '%s' is not 
supported", bodyObject.getRequestType())).end();
+            } catch (Exception e) {
+                log.error("[ChatGPTSourceConnector] Error processing request: 
{}", e.getMessage(), e);
+                
ctx.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
+            }
+        });
+    }
+
+    private CloudEvent invokeHandler(ChatGPTRequestType chatgptRequestType, 
ChatGPTRequestDTO bodyObject) {
+        switch (chatgptRequestType) {
+            case CHAT:
+                if (StringUtils.isBlank(bodyObject.getDataContentType())) {
+                    bodyObject.setDataContentType(TEXT_PLAIN);
+                }
+                return chatHandler.invoke(bodyObject);
+            case PARSE:
+                if (StringUtils.isBlank(parsePromptTemplateStr)) {
+                    throw new IllegalStateException(
+                        "the request type of PARSE must be configured with the 
correct parsePromptFileName in source-config.yml");
+                }
+                if (StringUtils.isBlank(bodyObject.getFields())) {
+                    throw new IllegalStateException("Attributes 'fields' 
cannot be null in PARSE");
+                }
+                if (StringUtils.isBlank(bodyObject.getDataContentType())) {
+                    bodyObject.setDataContentType(APPLICATION_JSON);
+                }
+                return parseHandler.invoke(bodyObject);
+            default:
+                throw new IllegalStateException("the request type is illegal");
+        }
+    }
+
+    private void handleError(Exception e, RoutingContext ctx) {
+        log.error("[ChatGPTSourceConnector] Malformed request.", e);
+        
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
+    }
+
+    @Override
+    public void start() {
+        Throwable t = this.server.listen().cause();
+        if (t != null) {
+            throw new EventMeshException("failed to start Vertx server", t);
+        }
+    }
+
+    @Override
+    public void commit(ConnectRecord record) {
+
+    }
+
+    @Override
+    public String name() {
+        return this.sourceConfig.getConnectorConfig().getConnectorName();
+    }
+
+    @Override
+    public void stop() {
+        Throwable t = this.server.close().cause();
+        if (t != null) {
+            throw new EventMeshException("failed to stop Vertx server", t);
+        }
+    }
+
+    @Override
+    public List<ConnectRecord> poll() {
+        List<ConnectRecord> connectRecords = new 
ArrayList<>(DEFAULT_BATCH_SIZE);
+        for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
+            try {
+                CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+                if (event == null) {
+                    break;
+                }
+                connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+            } catch (InterruptedException e) {
+                break;
+            }
+        }
+        return connectRecords;
+    }
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java
new file mode 100644
index 000000000..a203a24e5
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/dto/ChatGPTRequestDTO.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.dto;
+
+import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType;
+
+import java.time.ZonedDateTime;
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ChatGPTRequestDTO {
+
+    private String requestType = ChatGPTRequestType.CHAT.name();
+
+    private String source = "/";
+
+    private String subject = "chatGPT";
+
+    @JsonProperty("datacontenttype")
+    private String dataContentType;
+
+    private String type = "cloudevents";
+
+    private String text;
+
+    private String fields;
+
+    @JsonInclude
+    private String id = UUID.randomUUID().toString();
+
+    @JsonInclude
+    private String time = ZonedDateTime.now().toOffsetDateTime().toString();
+
+    public String getFields() {
+        return fields.replace(";", "\n");
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java
new file mode 100644
index 000000000..993052565
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/enums/ChatGPTRequestType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.enums;
+
+
+public enum ChatGPTRequestType {
+
+    CHAT, PARSE;
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java
new file mode 100644
index 000000000..6d79a0559
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ChatHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.handlers;
+
+
+import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO;
+import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager;
+
+import java.net.URI;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+
+import com.theokanning.openai.completion.chat.ChatCompletionRequest;
+import com.theokanning.openai.completion.chat.ChatMessage;
+import com.theokanning.openai.completion.chat.ChatMessageRole;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ChatHandler  {
+
+    private final OpenaiManager openaiManager;
+
+    public ChatHandler(OpenaiManager openaiManager) {
+        this.openaiManager = openaiManager;
+    }
+
+    public CloudEvent invoke(ChatGPTRequestDTO event) {
+        return genGptConnectRecord(event);
+    }
+
+    private CloudEvent genGptConnectRecord(ChatGPTRequestDTO event) {
+        List<ChatMessage> chatMessages = new ArrayList<>();
+        chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), 
event.getText()));
+        ChatCompletionRequest req = 
openaiManager.newChatCompletionRequest(chatMessages);
+        String chatResult = openaiManager.getResult(req);
+
+        return CloudEventBuilder.v1()
+            .withId(UUID.randomUUID().toString())
+            .withSource(URI.create(event.getSource()))
+            .withType(event.getType())
+            .withTime(ZonedDateTime.now().toOffsetDateTime())
+            .withData(chatResult.getBytes())
+            .withSubject(event.getSubject())
+            .withDataContentType(event.getDataContentType())
+            .build();
+    }
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java
new file mode 100644
index 000000000..aad3d384c
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/handlers/ParseHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.handlers;
+
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO;
+import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.text.StringSubstitutor;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.jackson.JsonFormat;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.theokanning.openai.completion.chat.ChatCompletionRequest;
+import com.theokanning.openai.completion.chat.ChatMessage;
+import com.theokanning.openai.completion.chat.ChatMessageRole;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ParseHandler {
+
+    private final OpenaiManager openaiManager;
+
+    private final String promptTemplate;
+
+    private static final JsonFormat jsonFormat = new JsonFormat(false, true);
+
+
+    public ParseHandler(OpenaiManager openaiManager, String promptTemplate) {
+        this.openaiManager = openaiManager;
+        this.promptTemplate = promptTemplate;
+    }
+
+    @SuppressWarnings("checkstyle:WhitespaceAfter")
+    public CloudEvent invoke(ChatGPTRequestDTO event) {
+        Map<String, String> map = convertToMap(event);
+
+        StringSubstitutor substitute = new StringSubstitutor(map);
+        String finalPrompt = substitute.replace(promptTemplate);
+        List<ChatMessage> chatMessages = new ArrayList<>();
+        chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), 
finalPrompt));
+        ChatCompletionRequest req = 
openaiManager.newChatCompletionRequest(chatMessages);
+        String chatResult = openaiManager.getResult(req);
+        chatResult = StringUtils.removeFirst(chatResult, "```json");
+        chatResult = StringUtils.removeEnd(chatResult, "```");
+        CloudEvent cloudEvent;
+        try {
+            cloudEvent = 
jsonFormat.deserialize(chatResult.getBytes(Constants.DEFAULT_CHARSET));
+        } catch (Exception e) {
+            throw new IllegalStateException("cloudEvent parse fail, please 
check your parse prompt file content", e);
+        }
+        return cloudEvent;
+    }
+
+    public Map<String, String> convertToMap(Object obj) {
+        Map<String, String> map = new HashMap<>();
+        Class<?> clazz = obj.getClass();
+        Field[] fields = clazz.getDeclaredFields();
+        for (Field field : fields) {
+            if (field.isSynthetic()) {
+                continue;
+            }
+            if (Map.class.isAssignableFrom(field.getType()) || 
List.class.isAssignableFrom(field.getType())) {
+                continue;
+            }
+            try {
+                String key = field.getName();
+                if (field.isAnnotationPresent(JsonProperty.class)) {
+                    JsonProperty annotation = 
field.getAnnotation(JsonProperty.class);
+                    key = annotation.value();
+                }
+                Method getter = getGetter(field, clazz);
+                map.put(key, String.valueOf(getter.invoke(obj)));
+            } catch (IllegalAccessException | NoSuchMethodException | 
InvocationTargetException e) {
+                throw new IllegalStateException("convert to Map is fail", e);
+            }
+        }
+
+        return map;
+    }
+
+    public Method getGetter(Field field, Class<?> clazz) throws 
NoSuchMethodException {
+        boolean isBooleanField = 
boolean.class.isAssignableFrom(field.getType()) || 
Boolean.class.isAssignableFrom(field.getType());
+        String handledFieldName = upperFirst(field.getName());
+        String methodName;
+        if (isBooleanField) {
+            methodName = "is" + handledFieldName;
+        } else {
+            methodName = "get" + handledFieldName;
+        }
+        return clazz.getDeclaredMethod(methodName);
+    }
+
+    public String upperFirst(String str) {
+        if (null == str) {
+            return null;
+        }
+        if (!str.isEmpty()) {
+            char firstChar = str.charAt(0);
+            if (Character.isLowerCase(firstChar)) {
+                return Character.toUpperCase(firstChar) + 
StringUtils.substring(str, 1);
+            }
+        }
+        return str;
+    }
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java
new file mode 100644
index 000000000..fda5216bb
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/managers/OpenaiManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.managers;
+
+import static com.theokanning.openai.service.OpenAiService.defaultClient;
+import static com.theokanning.openai.service.OpenAiService.defaultObjectMapper;
+import static com.theokanning.openai.service.OpenAiService.defaultRetrofit;
+
+import org.apache.eventmesh.common.utils.AssertUtils;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import 
org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig;
+import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiConfig;
+import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiProxyConfig;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.time.Duration;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.theokanning.openai.client.OpenAiApi;
+import com.theokanning.openai.completion.chat.ChatCompletionRequest;
+import 
com.theokanning.openai.completion.chat.ChatCompletionRequest.ChatCompletionRequestBuilder;
+import com.theokanning.openai.completion.chat.ChatMessage;
+import com.theokanning.openai.service.OpenAiService;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import okhttp3.OkHttpClient;
+import retrofit2.Retrofit;
+
+
+@Slf4j
+public class OpenaiManager {
+
+    @Getter
+    private OpenAiService openAiService;
+
+    private String chatCompletionRequestTemplateStr;
+
+    private static final int DEFAULT_TIMEOUT = 0;
+
+    public OpenaiManager(ChatGPTSourceConfig sourceConfig) {
+        initOpenAi(sourceConfig);
+    }
+
+    public String getResult(ChatCompletionRequest req) {
+        StringBuilder gptData = new StringBuilder();
+        try {
+            openAiService.createChatCompletion(req).getChoices()
+                .forEach(chatCompletionChoice -> 
gptData.append(chatCompletionChoice.getMessage().getContent()));
+        } catch (Exception e) {
+            log.error("Failed to generate GPT connection record: {}", 
e.getMessage());
+        }
+        return gptData.toString();
+    }
+
+    public ChatCompletionRequest newChatCompletionRequest(List<ChatMessage> 
chatMessages) {
+        ChatCompletionRequest request = 
JsonUtils.parseObject(chatCompletionRequestTemplateStr, 
ChatCompletionRequest.class);
+        request.setMessages(chatMessages);
+        return request;
+    }
+
+    private void initOpenAi(ChatGPTSourceConfig sourceConfig) {
+        OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig();
+        if (openaiConfig.getTimeout() < 0) {
+            log.warn("openaiTimeout must be >= 0, your config value is {}, 
openaiTimeout will be reset {}", openaiConfig.getTimeout(),
+                DEFAULT_TIMEOUT);
+            openaiConfig.setTimeout(DEFAULT_TIMEOUT);
+        }
+        boolean proxyEnable = sourceConfig.connectorConfig.isProxyEnable();
+        if (proxyEnable) {
+            OpenaiProxyConfig chatgptProxyConfig = 
sourceConfig.openaiProxyConfig;
+            if (chatgptProxyConfig.getHost() == null) {
+                throw new IllegalStateException("chatgpt proxy config 'host' 
cannot be null");
+            }
+            ObjectMapper mapper = defaultObjectMapper();
+            Proxy proxy = new Proxy(Proxy.Type.HTTP, new 
InetSocketAddress(chatgptProxyConfig.getHost(), chatgptProxyConfig.getPort()));
+            OkHttpClient client =
+                defaultClient(openaiConfig.getToken(), 
Duration.ofSeconds(openaiConfig.getTimeout())).newBuilder().proxy(proxy).build();
+            Retrofit retrofit = defaultRetrofit(client, mapper);
+            OpenAiApi api = retrofit.create(OpenAiApi.class);
+            this.openAiService = new OpenAiService(api);
+        } else {
+            this.openAiService = new OpenAiService(openaiConfig.getToken(), 
Duration.ofSeconds(openaiConfig.getTimeout()));
+        }
+        ChatCompletionRequestBuilder builder = 
ChatCompletionRequest.builder().model(openaiConfig.getModel());
+        AssertUtils.notNull(openaiConfig.getModel(), "model cannot be null");
+        builder = builder.model(openaiConfig.getModel());
+        if (openaiConfig.getUser() != null) {
+            builder = builder.user(openaiConfig.getUser());
+        }
+        if (openaiConfig.getPresencePenalty() != null) {
+            builder = 
builder.presencePenalty(openaiConfig.getPresencePenalty());
+        }
+        if (openaiConfig.getFrequencyPenalty() != null) {
+            builder = 
builder.frequencyPenalty(openaiConfig.getFrequencyPenalty());
+        }
+        if (openaiConfig.getMaxTokens() != null) {
+            builder = builder.maxTokens(openaiConfig.getMaxTokens());
+        }
+        if (openaiConfig.getTemperature() != null) {
+            builder = builder.temperature(openaiConfig.getTemperature());
+        }
+        if (openaiConfig.getLogitBias() != null && 
!openaiConfig.getLogitBias().isEmpty()) {
+            builder = builder.logitBias(openaiConfig.getLogitBias());
+        }
+        if (openaiConfig.getStop() != null && 
!openaiConfig.getStop().isEmpty()) {
+            builder = builder.stop(openaiConfig.getStop());
+        }
+        this.chatCompletionRequestTemplateStr = 
JsonUtils.toJSONString(builder.build());
+    }
+
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt
new file mode 100644
index 000000000..e10ecc331
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/prompt
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+You are an AI assistant named CloudEventsConverter. avoid escape characters .
+Your task is to construct a JSON object in CloudEvents format. Based on the 
field name and field description in the 'data' field of the CloudEvents 
formatted JSON object, convert the input text provided by the user into the 
content of the 'data' field, which must comply with the specifications of the 
content of the 'datacontenttype' field.
+The role is :
+ - If the 'datacontenttype' field content is 'application/json', then the' 
data 'field content should be a JSON object,
+ - else If the 'datacontenttype' field content is not 'application/json' and 
is 'application/xml', then the' data 'field content should be a string in XML 
format and the outermost of XML format is <data> </data>, inside is the XML 
generated by you based on field info;
+ - else the 'datacontenttype' field content is not 'application/json' and 
'application/xml', then the' data 'field content is string of the 'text' field 
content;
+Except for the content of the data field, all other values should be set to 
and cannot be modified. Finally, return to me the JSON object in CloudEvents 
format that you constructed
+
+The following text is the field name and field description in the 'data' field 
of the CloudEvents-formatted JSON object, extract the following information:
+<BEGIN FIELD INFO>
+${fields}
+<END FIELD INFO>
+
+text:  ${text}
+
+The output should be a markdown code snippet formatted in the following 
schema, including the leading and trailing "```json" and "```":
+```json
+{
+  "specversion": string,  Set to "1.0"
+  "type": string,  Set to  ${type}
+  "source": string, Set to  ${source}
+  "subject": string, Set to ${subject}
+  "id": string, Set to  ${id}
+  "time": string, Set to  ${time}
+  "datacontenttype": string, Set to  ${datacontenttype}
+  "data": object or string
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/server-config.yml
new file mode 100644
index 000000000..0cd7b5b5a
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml
new file mode 100644
index 000000000..b194e99ec
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/resources/source-config.yml
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+  meshAddress: 127.0.0.1:10000
+  subject: TopicTest
+  idc: FT
+  env: PRD
+  group: chatgptSource
+  appId: 5032
+  userName: chatgptSourceUser
+  passWord: chatgptPassWord
+connectorConfig:
+  connectorName: chatgptSource
+  path: /chatgpt
+  port: 3756
+  idleTimeout: 0
+  proxyEnable: false
+  parsePromptFileName: prompt
+
+# https://platform.openai.com/docs/api-reference/chat/create
+openaiConfig:
+  token:
+  model: gpt-3.5-turbo
+  timeout: 0
+  temperature: 1
+  maxTokens:
+  frequencyPenalty: 0
+  presencePenalty: 0
+  user: eventMesh
+  stop: []
+  logitBias: {}
+
+openaiProxyConfig:
+  host: 127.0.0.1
+  port: 7890
+
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java
new file mode 100644
index 000000000..8347fdcbb
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnectorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.chatgpt.source.connector;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import 
org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig;
+import 
org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConnectorConfig;
+import org.apache.eventmesh.connector.chatgpt.source.config.OpenaiConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class ChatGPTSourceConnectorTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger("ChatGPTSourceConnectorTest");
+
+    private ChatGPTSourceConnector connector;
+    private ChatGPTSourceConnectorConfig config;
+    private CloseableHttpClient httpClient;
+    private String uri;
+    private final String expectedMessage = "Hello, can you tell me a story.";
+
+    private final String expectedParseMessage = "User 13356288979 from Tianjin 
store placed an order with order number 11221122";
+
+
+    public boolean checkOpenAi() throws Exception {
+        ChatGPTSourceConfig sourceConfig = (ChatGPTSourceConfig) 
ConfigUtil.parse(connector.configClass());
+        OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig();
+        if (StringUtils.isBlank(openaiConfig.getToken())) {
+            return false;
+        }
+        return true;
+    }
+
+    @BeforeEach
+    void setUp() throws Exception {
+        connector = new ChatGPTSourceConnector();
+        if (!checkOpenAi()) {
+            LOGGER.error("please set openai token in the config");
+            return;
+        }
+        ChatGPTSourceConfig sourceConfig = (ChatGPTSourceConfig) 
ConfigUtil.parse(connector.configClass());
+        config = sourceConfig.getConnectorConfig();
+        connector.init(sourceConfig);
+        connector.start();
+
+        uri = new 
URIBuilder().setScheme("http").setHost("127.0.0.1").setPort(config.getPort()).setPath(config.getPath()).build().toString();
+
+        httpClient = HttpClients.createDefault();
+    }
+
+    @Test
+    void testPoll() throws Exception {
+        ChatGPTSourceConfig sourceConfig = (ChatGPTSourceConfig) 
ConfigUtil.parse(connector.configClass());
+        OpenaiConfig openaiConfig = sourceConfig.getOpenaiConfig();
+        if (StringUtils.isBlank(openaiConfig.getToken())) {
+            LOGGER.error("please set openai token in the config");
+            return;
+        }
+
+        final int batchSize = 10;
+
+        for (int i = 0; i < batchSize; i++) {
+            HttpResponse resp = mockStructuredChatRequest();
+            Assertions.assertEquals(resp.getStatusLine().getStatusCode(), 
HttpStatus.SC_OK);
+        }
+
+        List<ConnectRecord> res = connector.poll();
+        Assertions.assertEquals(batchSize, res.size());
+
+        for (int i = 0; i < batchSize; i++) {
+            HttpResponse resp = mockStructuredParseRequest();
+            Assertions.assertEquals(resp.getStatusLine().getStatusCode(), 
HttpStatus.SC_OK);
+        }
+
+        List<ConnectRecord> res1 = connector.poll();
+        Assertions.assertEquals(batchSize, res1.size());
+    }
+
+
+    HttpResponse mockStructuredChatRequest() throws Exception {
+        TestEvent event = new TestEvent();
+        event.type = "com.example.someevent";
+        event.source = "/mycontext";
+        event.subject = "test";
+        event.datacontenttype = "text/plain";
+        event.text = expectedMessage;
+        event.requestType = "CHAT";
+        HttpPost httpPost = new HttpPost(uri);
+        httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event)));
+
+        return httpClient.execute(httpPost);
+    }
+
+
+    HttpResponse mockStructuredParseRequest() throws Exception {
+        TestEvent event = new TestEvent();
+        event.type = "com.example.someevent";
+        event.source = "/mycontext";
+        event.subject = "test";
+        event.datacontenttype = "application/json";
+        event.text = expectedParseMessage;
+        event.requestType = "PARSE";
+        event.fields = "orderNo:this is order number;address:this is a 
address;phone:this is phone number";
+        HttpPost httpPost = new HttpPost(uri);
+        httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event)));
+        return httpClient.execute(httpPost);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (!checkOpenAi()) {
+            return;
+        }
+        if (connector != null) {
+            connector.stop();
+        }
+        if (httpClient != null) {
+            httpClient.close();
+        }
+    }
+
+    class TestEvent {
+
+        public String requestType;
+        public String type;
+        public String source;
+        public String subject;
+        public String datacontenttype;
+        public String text;
+        public String fields;
+    }
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/server-config.yml
new file mode 100644
index 000000000..0cd7b5b5a
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml
new file mode 100644
index 000000000..47f25edbb
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/test/resources/source-config.yml
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TopicTest
+    idc: FT
+    env: PRD
+    group: chatgptSource
+    appId: 5032
+    userName: chatgptSourceUser
+    passWord: chatgptPassWord
+connectorConfig:
+    connectorName: chatgptSource
+    path: /chatgpt
+    port: 3756
+    idleTimeout: 0
+    proxyEnable: true
+    parsePromptFileName: prompt
+
+# https://platform.openai.com/docs/api-reference/chat/create
+openaiConfig:
+    token:
+    model: gpt-3.5-turbo
+    timeout: 0
+    temperature: 1
+    maxTokens:
+    frequencyPenalty: 0
+    presencePenalty: 0
+    user: eventMesh
+    stop: []
+    logitBias: {
+
+    }
+
+openaiProxyConfig:
+    host: 127.0.0.1
+    port: 7890
diff --git a/settings.gradle b/settings.gradle
index 645e6fb36..6a8e27bf9 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -77,6 +77,7 @@ include 'eventmesh-connectors:eventmesh-connector-wecom'
 include 'eventmesh-connectors:eventmesh-connector-slack'
 include 'eventmesh-connectors:eventmesh-connector-wechat'
 include 'eventmesh-connectors:eventmesh-connector-http'
+include 'eventmesh-connectors:eventmesh-connector-chatgpt'
 
 include 'eventmesh-storage-plugin:eventmesh-storage-api'
 include 'eventmesh-storage-plugin:eventmesh-storage-standalone'
diff --git a/tools/dependency-check/known-dependencies.txt 
b/tools/dependency-check/known-dependencies.txt
index 05c99f3d3..73feb5fd3 100644
--- a/tools/dependency-check/known-dependencies.txt
+++ b/tools/dependency-check/known-dependencies.txt
@@ -1,6 +1,7 @@
 FastInfoset-1.2.15.jar
 ST4-4.3.4.jar
 accessors-smart-2.4.7.jar
+adapter-rxjava2-2.9.0.jar
 alibabacloud-gateway-spi-0.0.1.jar
 amqp-client-5.16.0.jar
 animal-sniffer-annotations-1.19.jar
@@ -12,6 +13,7 @@ antlr4-4.13.0.jar
 antlr4-runtime-4.13.0.jar
 aopalliance-1.0.jar
 apache-client-2.20.29.jar
+api-0.18.2.jar
 arns-2.20.29.jar
 asm-9.1.jar
 asm-9.2.jar
@@ -43,7 +45,9 @@ byte-buddy-1.11.0.jar
 byte-buddy-1.12.18.jar
 cache-api-1.1.1.jar
 checker-qual-3.12.0.jar
+classgraph-4.8.21.jar
 classmate-1.5.1.jar
+client-0.18.2.jar
 cloudevents-api-2.4.2.jar
 cloudevents-core-2.4.2.jar
 cloudevents-http-vertx-2.3.0.jar
@@ -63,6 +67,7 @@ commons-logging-1.2.jar
 commons-text-1.9.jar
 commons-validator-1.7.jar
 consul-api-1.4.5.jar
+converter-jackson-2.9.0.jar
 credentials-java-0.2.4.jar
 crt-core-2.20.29.jar
 curator-client-5.4.0.jar
@@ -155,6 +160,7 @@ json-path-2.7.0.jar
 json-smart-2.4.7.jar
 json-utils-2.20.29.jar
 jsr305-3.0.2.jar
+jtokkit-0.5.1.jar
 kafka-clients-3.0.0.jar
 listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
 log4j-api-2.22.1.jar
@@ -162,6 +168,7 @@ log4j-core-2.22.1.jar
 log4j-slf4j2-impl-2.22.1.jar
 lz4-java-1.7.1.jar
 lz4-java-1.8.0.jar
+mbknor-jackson-jsonschema_2.12-1.0.34.jar
 metrics-annotation-4.1.0.jar
 metrics-core-4.1.0.jar
 metrics-healthchecks-4.1.0.jar
@@ -306,6 +313,7 @@ reactor-core-3.4.13.jar
 redisson-3.17.3.jar
 regions-2.20.29.jar
 relaxngDatatype-20020414.jar
+retrofit-2.9.0.jar
 rocketmq-acl-4.9.5.jar
 rocketmq-broker-4.9.5.jar
 rocketmq-client-4.9.5.jar
@@ -317,9 +325,12 @@ rocketmq-remoting-4.9.5.jar
 rocketmq-srvutil-4.9.5.jar
 rocketmq-store-4.9.5.jar
 rocketmq-tools-4.9.5.jar
+rxjava-2.0.0.jar
 rxjava-3.0.12.jar
 s3-2.20.29.jar
+scala-library-2.12.8.jar
 sdk-core-2.20.29.jar
+service-0.18.2.jar
 simpleclient-0.12.0.jar
 simpleclient_tracer_common-0.12.0.jar
 simpleclient_tracer_otel-0.12.0.jar
@@ -352,6 +363,7 @@ tomcat-embed-el-9.0.56.jar
 txw2-2.3.1.jar
 utils-2.20.29.jar
 validation-api-1.1.0.Final.jar
+validation-api-2.0.1.Final.jar
 vertx-auth-common-4.4.6.jar
 vertx-bridge-common-4.4.6.jar
 vertx-core-4.4.6.jar


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to