This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new da5fdbfdb [ISSUE #5101] Define and standardize some common 
configurations for all Sources(#5102)
da5fdbfdb is described below

commit da5fdbfdbd18fb10cba7606d953e8ab2edabee56
Author: Zaki <[email protected]>
AuthorDate: Mon Oct 28 19:08:59 2024 +0800

    [ISSUE #5101] Define and standardize some common configurations for all 
Sources(#5102)
---
 .../common/config/connector/Constants.java         | 17 ++++++++
 .../{SourceConfig.java => PollConfig.java}         | 24 +++++++----
 .../common/config/connector/SourceConfig.java      |  3 ++
 .../connector/http/SourceConnectorConfig.java      |  8 +---
 .../connector/mq/kafka/SourceConnectorConfig.java  |  1 -
 .../connector/CanalSourceCheckConnector.java       |  8 +++-
 .../source/connector/CanalSourceFullConnector.java |  7 +++-
 .../source/connector/ChatGPTSourceConnector.java   | 22 +++++++---
 .../http/common/SynchronizedCircularFifoQueue.java |  1 -
 .../connector/http/source/HttpSourceConnector.java | 49 +++++++++++++---------
 .../connector/http/source/protocol/Protocol.java   |  5 ++-
 .../source/protocol/impl/CloudEventProtocol.java   |  5 ++-
 .../http/source/protocol/impl/CommonProtocol.java  |  4 +-
 .../http/source/protocol/impl/GitHubProtocol.java  |  4 +-
 .../src/main/resources/source-config.yml           |  2 -
 .../src/test/resources/source-config.yml           |  2 -
 .../connector/jdbc/source/JdbcSourceConnector.java |  9 ++--
 .../jdbc/source/TaskManagerCoordinator.java        | 29 ++++++++-----
 .../source/connector/KafkaSourceConnector.java     |  6 +--
 .../source/connector/MongodbSourceConnector.java   | 24 +++++++----
 .../connector/OpenFunctionSourceConnector.java     | 30 +++++++++----
 .../source/connector/PravegaSourceConnector.java   | 24 +++++++----
 .../source/connector/RabbitMQSourceConnector.java  | 24 +++++++----
 .../source/connector/RedisSourceConnector.java     | 24 +++++++----
 .../source/connector/SpringSourceConnector.java    | 28 +++++++++----
 25 files changed, 241 insertions(+), 119 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java
index 74576e843..817efb6d3 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java
@@ -30,4 +30,21 @@ public class Constants {
     public static final int DEFAULT_ATTEMPT = 3;
 
     public static final int DEFAULT_PORT = 8080;
+
+    // ======================== Source Constants ========================
+    /**
+     * Default capacity
+     */
+    public static final int DEFAULT_CAPACITY = 1024;
+
+    /**
+     * Default poll batch size
+     */
+    public static final int DEFAULT_POLL_BATCH_SIZE = 10;
+
+    /**
+     * Default poll timeout (unit: ms)
+     */
+    public static final long DEFAULT_POLL_TIMEOUT = 5000L;
+
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/PollConfig.java
similarity index 69%
copy from 
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
copy to 
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/PollConfig.java
index 763063125..cf3f06be9 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/PollConfig.java
@@ -17,17 +17,27 @@
 
 package org.apache.eventmesh.common.config.connector;
 
-import org.apache.eventmesh.common.config.connector.offset.OffsetStorageConfig;
-
 import lombok.Data;
-import lombok.EqualsAndHashCode;
 
+/**
+ * Source Poll Config
+ */
 @Data
-@EqualsAndHashCode(callSuper = true)
-public abstract class SourceConfig extends Config {
+public class PollConfig {
+
+    /**
+     * Capacity of the poll queue
+     */
+    private int capacity = Constants.DEFAULT_CAPACITY;
 
-    private PubSubConfig pubSubConfig;
+    /**
+     * Max batch size of the poll
+     */
+    private int maxBatchSize = Constants.DEFAULT_POLL_BATCH_SIZE;
 
-    private OffsetStorageConfig offsetStorageConfig;
+    /**
+     * Max wait time of the poll
+     */
+    private long maxWaitTime = Constants.DEFAULT_POLL_TIMEOUT;
 
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
index 763063125..f7bc42970 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
@@ -30,4 +30,7 @@ public abstract class SourceConfig extends Config {
 
     private OffsetStorageConfig offsetStorageConfig;
 
+    // Polling configuration, e.g. capacity, batch size, wait time, etc.
+    private PollConfig pollConfig = new PollConfig();
+
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
index 58d910bf2..282f88333 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
@@ -44,13 +44,7 @@ public class SourceConnectorConfig {
      */
     private int maxFormAttributeSize = 1024 * 1024;
 
-    // max size of the queue, default 1000
-    private int maxStorageSize = 1000;
-
-    // batch size, default 10
-    private int batchSize = 10;
-
-    // protocol, default CloudEvent
+    // protocol, default Common
     private String protocol = "Common";
 
     // extra config, e.g. GitHub secret
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java
index 21fb18eb2..eb7406f66 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java
@@ -32,5 +32,4 @@ public class SourceConnectorConfig {
     private String enableAutoCommit = "false";
     private String sessionTimeoutMS = "10000";
     private String maxPollRecords = "1000";
-    private int pollTimeOut = 100;
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java
index 841c9a481..bd85f0324 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java
@@ -50,12 +50,14 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class CanalSourceCheckConnector extends AbstractComponent implements 
Source, ConnectorCreateService<Source> {
+
     private CanalSourceFullConfig config;
     private CanalFullPositionMgr positionMgr;
     private RdbTableMgr tableMgr;
     private ThreadPoolExecutor executor;
-    private final BlockingQueue<List<ConnectRecord>> queue = new 
LinkedBlockingQueue<>();
+    private BlockingQueue<List<ConnectRecord>> queue;
     private final AtomicBoolean flag = new AtomicBoolean(true);
+    private long maxPollWaitTime;
 
     @Override
     protected void run() throws Exception {
@@ -140,6 +142,8 @@ public class CanalSourceCheckConnector extends 
AbstractComponent implements Sour
         DatabaseConnection.initSourceConnection();
         this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(), 
DatabaseConnection.sourceDataSource);
         this.positionMgr = new CanalFullPositionMgr(config, tableMgr);
+        this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime();
+        this.queue = new 
LinkedBlockingQueue<>(config.getPollConfig().getCapacity());
     }
 
     @Override
@@ -168,7 +172,7 @@ public class CanalSourceCheckConnector extends 
AbstractComponent implements Sour
     public List<ConnectRecord> poll() {
         while (flag.get()) {
             try {
-                List<ConnectRecord> records = queue.poll(5, TimeUnit.SECONDS);
+                List<ConnectRecord> records = queue.poll(maxPollWaitTime, 
TimeUnit.MILLISECONDS);
                 if (records == null || records.isEmpty()) {
                     continue;
                 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
index c2632ee47..09e2e0dcf 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
@@ -56,8 +56,9 @@ public class CanalSourceFullConnector extends 
AbstractComponent implements Sourc
     private CanalFullPositionMgr positionMgr;
     private RdbTableMgr tableMgr;
     private ThreadPoolExecutor executor;
-    private final BlockingQueue<List<ConnectRecord>> queue = new 
LinkedBlockingQueue<>();
+    private BlockingQueue<List<ConnectRecord>> queue;
     private final AtomicBoolean flag = new AtomicBoolean(true);
+    private long maxPollWaitTime;
 
     @Override
     protected void run() throws Exception {
@@ -137,6 +138,8 @@ public class CanalSourceFullConnector extends 
AbstractComponent implements Sourc
         DatabaseConnection.initSourceConnection();
         this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(), 
DatabaseConnection.sourceDataSource);
         this.positionMgr = new CanalFullPositionMgr(config, tableMgr);
+        this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime();
+        this.queue = new 
LinkedBlockingQueue<>(config.getPollConfig().getCapacity());
     }
 
     @Override
@@ -166,7 +169,7 @@ public class CanalSourceFullConnector extends 
AbstractComponent implements Sourc
     public List<ConnectRecord> poll() {
         while (flag.get()) {
             try {
-                List<ConnectRecord> records = queue.poll(5, TimeUnit.SECONDS);
+                List<ConnectRecord> records = queue.poll(maxPollWaitTime, 
TimeUnit.MILLISECONDS);
                 if (records == null || records.isEmpty()) {
                     continue;
                 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
index 6b122087e..1b6955feb 100644
--- 
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
@@ -61,8 +61,6 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class ChatGPTSourceConnector implements Source {
 
-    private static final int DEFAULT_BATCH_SIZE = 10;
-
     private ChatGPTSourceConfig sourceConfig;
     private BlockingQueue<CloudEvent> queue;
     private HttpServer server;
@@ -79,6 +77,9 @@ public class ChatGPTSourceConnector implements Source {
     private static final String APPLICATION_JSON = "application/json";
     private static final String TEXT_PLAIN = "text/plain";
 
+    private int maxBatchSize;
+    private long maxPollWaitTime;
+
 
     @Override
     public Class<? extends Config> configClass() {
@@ -129,7 +130,9 @@ public class ChatGPTSourceConnector implements Source {
         if (StringUtils.isNotEmpty(parsePromptTemplateStr)) {
             this.parseHandler = new ParseHandler(openaiManager, 
parsePromptTemplateStr);
         }
-        this.queue = new LinkedBlockingQueue<>(1024);
+        this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+        this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
+        this.queue = new 
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
         final Vertx vertx = Vertx.vertx();
         final Router router = Router.router(vertx);
         
router.route().path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST).handler(BodyHandler.create()).handler(ctx
 -> {
@@ -239,14 +242,21 @@ public class ChatGPTSourceConnector implements Source {
 
     @Override
     public List<ConnectRecord> poll() {
-        List<ConnectRecord> connectRecords = new 
ArrayList<>(DEFAULT_BATCH_SIZE);
-        for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
+        long startTime = System.currentTimeMillis();
+        long remainingTime = maxPollWaitTime;
+
+        List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+        for (int i = 0; i < maxBatchSize; i++) {
             try {
-                CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+                CloudEvent event = queue.poll(remainingTime, 
TimeUnit.MILLISECONDS);
                 if (event == null) {
                     break;
                 }
                 connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+
+                // calculate elapsed time and update remaining time for next 
poll
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                remainingTime = maxPollWaitTime > elapsedTime ? 
maxPollWaitTime - elapsedTime : 0;
             } catch (InterruptedException e) {
                 break;
             }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
index 0564e5873..9989552d1 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
@@ -142,7 +142,6 @@ public class SynchronizedCircularFifoQueue<E> extends 
CircularFifoQueue<E> {
             count++;
         }
         return items;
-
     }
 
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
index 2b2a01a9d..6c78badaf 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
@@ -20,7 +20,6 @@ package org.apache.eventmesh.connector.http.source;
 import org.apache.eventmesh.common.config.connector.Config;
 import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
 import org.apache.eventmesh.common.exception.EventMeshException;
-import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
 import org.apache.eventmesh.connector.http.source.protocol.Protocol;
 import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory;
 import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
@@ -30,8 +29,9 @@ import org.apache.eventmesh.openconnect.api.source.Source;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.handler.codec.http.HttpResponseStatus;
@@ -50,9 +50,11 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
     private HttpSourceConfig sourceConfig;
 
-    private SynchronizedCircularFifoQueue<Object> queue;
+    private BlockingQueue<Object> queue;
 
-    private int batchSize;
+    private int maxBatchSize;
+
+    private long maxPollWaitTime;
 
     private Route route;
 
@@ -92,11 +94,11 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
     private void doInit() {
         // init queue
-        int maxQueueSize = 
this.sourceConfig.getConnectorConfig().getMaxStorageSize();
-        this.queue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
+        this.queue = new 
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
 
-        // init batch size
-        this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();
+        // init poll batch size and timeout
+        this.maxBatchSize = 
this.sourceConfig.getPollConfig().getMaxBatchSize();
+        this.maxPollWaitTime = 
this.sourceConfig.getPollConfig().getMaxWaitTime();
 
         // init protocol
         String protocolName = 
this.sourceConfig.getConnectorConfig().getProtocol();
@@ -183,20 +185,29 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
     @Override
     public List<ConnectRecord> poll() {
-        // if queue is empty, return empty list
-        if (queue.isEmpty()) {
-            return Collections.emptyList();
-        }
+        // record current time
+        long startTime = System.currentTimeMillis();
+        long remainingTime = maxPollWaitTime;
+
         // poll from queue
-        List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
-        for (int i = 0; i < batchSize; i++) {
-            Object obj = queue.poll();
-            if (obj == null) {
+        List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+        for (int i = 0; i < maxBatchSize; i++) {
+            try {
+                Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS);
+                if (obj == null) {
+                    break;
+                }
+                // convert to ConnectRecord
+                ConnectRecord connectRecord = 
protocol.convertToConnectRecord(obj);
+                connectRecords.add(connectRecord);
+
+                // calculate elapsed time and update remaining time for next 
poll
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                remainingTime = maxPollWaitTime > elapsedTime ? 
maxPollWaitTime - elapsedTime : 0;
+            } catch (Exception e) {
+                log.error("Failed to poll from queue.", e);
                 break;
             }
-            // convert to ConnectRecord
-            ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
-            connectRecords.add(connectRecord);
         }
         return connectRecords;
     }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
index b671383e5..c5a22139e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
@@ -18,9 +18,10 @@
 package org.apache.eventmesh.connector.http.source.protocol;
 
 import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
-import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
+import java.util.concurrent.BlockingQueue;
+
 import io.vertx.ext.web.Route;
 
 
@@ -45,7 +46,7 @@ public interface Protocol {
      * @param route     route
      * @param queue queue info
      */
-    void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue);
+    void setHandler(Route route, BlockingQueue<Object> queue);
 
 
     /**
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
index 4906e920f..a44ed0e90 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
@@ -18,12 +18,13 @@
 package org.apache.eventmesh.connector.http.source.protocol.impl;
 
 import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
-import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
 import org.apache.eventmesh.connector.http.source.data.CommonResponse;
 import org.apache.eventmesh.connector.http.source.protocol.Protocol;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 import org.apache.eventmesh.openconnect.util.CloudEventUtil;
 
+import java.util.concurrent.BlockingQueue;
+
 import io.cloudevents.CloudEvent;
 import io.cloudevents.http.vertx.VertxMessageFactory;
 import io.netty.handler.codec.http.HttpResponseStatus;
@@ -60,7 +61,7 @@ public class CloudEventProtocol implements Protocol {
      * @param queue queue info
      */
     @Override
-    public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> 
queue) {
+    public void setHandler(Route route, BlockingQueue<Object> queue) {
         route.method(HttpMethod.POST)
             .handler(ctx -> VertxMessageFactory.createReader(ctx.request())
                 .map(reader -> {
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
index 0761170ac..e831dc972 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
@@ -20,7 +20,6 @@ package 
org.apache.eventmesh.connector.http.source.protocol.impl;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
 import org.apache.eventmesh.common.utils.JsonUtils;
-import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
 import org.apache.eventmesh.connector.http.source.data.CommonResponse;
 import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
 import org.apache.eventmesh.connector.http.source.protocol.Protocol;
@@ -28,6 +27,7 @@ import 
org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import java.util.Base64;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
 import java.util.stream.Collectors;
 
 import io.netty.handler.codec.http.HttpResponseStatus;
@@ -66,7 +66,7 @@ public class CommonProtocol implements Protocol {
      * @param queue queue info
      */
     @Override
-    public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> 
queue) {
+    public void setHandler(Route route, BlockingQueue<Object> queue) {
         route.method(HttpMethod.POST)
             .handler(BodyHandler.create())
             .handler(ctx -> {
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
index fac8c0d80..e1edbd0fa 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
@@ -20,7 +20,6 @@ package 
org.apache.eventmesh.connector.http.source.protocol.impl;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
 import org.apache.eventmesh.common.exception.EventMeshException;
-import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
 import org.apache.eventmesh.connector.http.source.data.CommonResponse;
 import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
 import org.apache.eventmesh.connector.http.source.protocol.Protocol;
@@ -31,6 +30,7 @@ import org.apache.commons.lang3.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
 import java.util.stream.Collectors;
 
 import javax.crypto.Mac;
@@ -90,7 +90,7 @@ public class GitHubProtocol implements Protocol {
      * @param queue queue info
      */
     @Override
-    public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> 
queue) {
+    public void setHandler(Route route, BlockingQueue<Object> queue) {
         route.method(HttpMethod.POST)
             .handler(BodyHandler.create())
             .handler(ctx -> {
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
index b1edc084f..0a73e627b 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
@@ -30,8 +30,6 @@ connectorConfig:
     port: 3755
     idleTimeout: 5000   # timeunit: ms
     maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB). 
This applies only when handling form data submissions.
-    maxStorageSize: 1000 # max storage size, default: 1000
-    batchSize: 10 # batch size, default: 10
     protocol: CloudEvent # Case insensitive, default: CloudEvent, options: 
CloudEvent, GitHub, Common
     extraConfig: # extra config for different protocol, e.g. GitHub secret
         secret: xxxxxxx # GitHub secret
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
index 735d3b01d..336bb2cb5 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
@@ -30,8 +30,6 @@ connectorConfig:
     port: 3755
     idleTimeout: 5000   # timeunit: ms
     maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB). 
This applies only when handling form data submissions.
-    maxStorageSize: 1000 # max storage size, default: 1000
-    batchSize: 10 # batch size, default: 10
     protocol: CloudEvent # Case insensitive, default: CloudEvent, options: 
CloudEvent, GitHub, Common
     extraConfig: # extra config for different protocol, e.g. GitHub secret
         secret: xxxxxxx # GitHub secret
diff --git 
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
index 810a59e72..ecc5a4415 100644
--- 
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
@@ -142,7 +142,9 @@ public class JdbcSourceConnector extends SourceConnector {
 
         this.dispatcher = new EventDispatcher(this.sourceJdbcTaskManager);
 
-        this.taskManagerCoordinator = new TaskManagerCoordinator();
+        this.taskManagerCoordinator = new 
TaskManagerCoordinator(sourceConfig.getPollConfig().getCapacity(),
+            sourceConfig.getPollConfig().getMaxBatchSize(),
+            sourceConfig.getPollConfig().getMaxWaitTime());
         
this.taskManagerCoordinator.registerTaskManager(SourceJdbcTaskManager.class.getName(),
 sourceJdbcTaskManager);
         this.taskManagerCoordinator.init();
     }
@@ -209,9 +211,6 @@ public class JdbcSourceConnector extends SourceConnector {
 
     @Override
     public List<ConnectRecord> poll() {
-
-        List<ConnectRecord> connectRecords = 
this.taskManagerCoordinator.poll();
-
-        return connectRecords;
+        return this.taskManagerCoordinator.poll();
     }
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
 
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
index c299fbc53..8efb8cbc7 100644
--- 
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
+++ 
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
@@ -40,16 +40,16 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class TaskManagerCoordinator {
 
-    private static final int BATCH_MAX = 10;
-    private static final int DEFAULT_QUEUE_SIZE = 1 << 13;
+    private final BlockingQueue<ConnectRecord> recordBlockingQueue;
+    private final Map<String, JdbcTaskManager> taskManagerCache = new 
HashMap<>(8);
+    private final int maxBatchSize;
+    private final long maxPollTimeout;
 
-    private BlockingQueue<ConnectRecord> recordBlockingQueue = new 
LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
-    private Map<String, JdbcTaskManager> taskManagerCache = new HashMap<>(8);
 
-    /**
-     * Constructs a new TaskManagerCoordinator.
-     */
-    public TaskManagerCoordinator() {
+    public TaskManagerCoordinator(int capacity, int maxBatchSize, long 
maxPollTimeout) {
+        this.recordBlockingQueue = new LinkedBlockingQueue<>(capacity);
+        this.maxBatchSize = maxBatchSize;
+        this.maxPollTimeout = maxPollTimeout;
     }
 
     /**
@@ -96,10 +96,13 @@ public class TaskManagerCoordinator {
      * @return A list of ConnectRecords, up to the maximum batch size defined 
by BATCH_MAX.
      */
     public List<ConnectRecord> poll() {
-        List<ConnectRecord> records = new ArrayList<>(BATCH_MAX);
-        for (int index = 0; index < BATCH_MAX; ++index) {
+        long startTime = System.currentTimeMillis();
+        long remainingTime = maxPollTimeout;
+
+        List<ConnectRecord> records = new ArrayList<>(maxBatchSize);
+        for (int index = 0; index < maxBatchSize; ++index) {
             try {
-                ConnectRecord record = recordBlockingQueue.poll(3, 
TimeUnit.SECONDS);
+                ConnectRecord record = recordBlockingQueue.poll(remainingTime, 
TimeUnit.MILLISECONDS);
                 if (Objects.isNull(record)) {
                     break;
                 }
@@ -107,6 +110,10 @@ public class TaskManagerCoordinator {
                     log.debug("record:{}", JsonUtils.toJSONString(record));
                 }
                 records.add(record);
+
+                // calculate elapsed time and update remaining time for next 
poll
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                remainingTime = maxPollTimeout > elapsedTime ? maxPollTimeout 
- elapsedTime : 0;
             } catch (InterruptedException e) {
                 break;
             }
diff --git 
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
index d57312693..f771e907c 100644
--- 
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
@@ -45,7 +45,7 @@ public class KafkaSourceConnector implements Source {
 
     private KafkaConsumer<String, String> kafkaConsumer;
 
-    private int pollTimeOut = 100;
+    private long maxPollWaitTime;
 
     @Override
     public Class<? extends Config> configClass() {
@@ -75,7 +75,7 @@ public class KafkaSourceConnector implements Source {
         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
sourceConfig.getConnectorConfig().getMaxPollRecords());
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
sourceConfig.getConnectorConfig().getAutoCommitIntervalMS());
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
sourceConfig.getConnectorConfig().getSessionTimeoutMS());
-        this.pollTimeOut = sourceConfig.getConnectorConfig().getPollTimeOut();
+        this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
         this.kafkaConsumer = new KafkaConsumer<>(props);
     }
 
@@ -106,7 +106,7 @@ public class KafkaSourceConnector implements Source {
 
     @Override
     public List<ConnectRecord> poll() {
-        ConsumerRecords<String, String> records = 
kafkaConsumer.poll(Duration.ofMillis(pollTimeOut));
+        ConsumerRecords<String, String> records = 
kafkaConsumer.poll(Duration.ofMillis(maxPollWaitTime));
         List<ConnectRecord> connectRecords = new ArrayList<>(records.count());
         for (ConsumerRecord<String, String> record : records) {
             Long timestamp = System.currentTimeMillis();
diff --git 
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
index df3f66d6a..1d1dcc184 100644
--- 
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
@@ -42,10 +42,12 @@ public class MongodbSourceConnector implements Source {
 
     private MongodbSourceConfig sourceConfig;
 
-    private static final int DEFAULT_BATCH_SIZE = 10;
-
     private BlockingQueue<CloudEvent> queue;
 
+    private int maxBatchSize;
+
+    private long maxPollWaitTime;
+
     private MongodbSourceClient client;
 
     @Override
@@ -67,7 +69,9 @@ public class MongodbSourceConnector implements Source {
     }
 
     private void doInit() {
-        this.queue = new LinkedBlockingQueue<>(1000);
+        this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+        this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
+        this.queue = new 
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
         String connectorType = 
sourceConfig.getConnectorConfig().getConnectorType();
         if (connectorType.equals(ClusterType.STANDALONE.name())) {
             this.client = new 
MongodbStandaloneSourceClient(sourceConfig.getConnectorConfig(), queue);
@@ -105,15 +109,21 @@ public class MongodbSourceConnector implements Source {
 
     @Override
     public List<ConnectRecord> poll() {
-        List<ConnectRecord> connectRecords = new 
ArrayList<>(DEFAULT_BATCH_SIZE);
-        for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+        long startTime = System.currentTimeMillis();
+        long remainingTime = maxPollWaitTime;
+
+        List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+        for (int count = 0; count < maxBatchSize; ++count) {
             try {
-                CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+                CloudEvent event = queue.poll(remainingTime, 
TimeUnit.MILLISECONDS);
                 if (event == null) {
                     break;
                 }
-
                 connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+
+                // calculate elapsed time and update remaining time for next 
poll
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                remainingTime = maxPollWaitTime > elapsedTime ? 
maxPollWaitTime - elapsedTime : 0;
             } catch (InterruptedException e) {
                 break;
             }
diff --git 
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
index 534ecfb79..e40c451ff 100644
--- 
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
@@ -35,12 +35,14 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class OpenFunctionSourceConnector implements Source {
 
-    private static final int DEFAULT_BATCH_SIZE = 10;
-
     private OpenFunctionSourceConfig sourceConfig;
 
     private BlockingQueue<ConnectRecord> queue;
 
+    private int maxBatchSize;
+
+    private long maxPollWaitTime;
+
     @Override
     public Class<? extends Config> configClass() {
         return OpenFunctionSourceConfig.class;
@@ -50,7 +52,7 @@ public class OpenFunctionSourceConnector implements Source {
     public void init(Config config) throws Exception {
         // init config for openfunction source connector
         this.sourceConfig = (OpenFunctionSourceConfig) config;
-        this.queue = new LinkedBlockingQueue<>(1000);
+        doInit();
     }
 
     @Override
@@ -58,7 +60,14 @@ public class OpenFunctionSourceConnector implements Source {
         SourceConnectorContext sourceConnectorContext = 
(SourceConnectorContext) connectorContext;
         // init config for openfunction source connector
         this.sourceConfig = (OpenFunctionSourceConfig) 
sourceConnectorContext.getSourceConfig();
-        this.queue = new LinkedBlockingQueue<>(1000);
+        doInit();
+    }
+
+    private void doInit() {
+        // init config for openfunction source connector
+        this.queue = new 
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+        this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+        this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
     }
 
     @Override
@@ -92,16 +101,21 @@ public class OpenFunctionSourceConnector implements Source 
{
 
     @Override
     public List<ConnectRecord> poll() {
+        long startTime = System.currentTimeMillis();
+        long remainingTime = maxPollWaitTime;
 
-        List<ConnectRecord> connectRecords = new 
ArrayList<>(DEFAULT_BATCH_SIZE);
-
-        for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+        List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+        for (int count = 0; count < maxBatchSize; ++count) {
             try {
-                ConnectRecord connectRecord = queue.poll(3, TimeUnit.SECONDS);
+                ConnectRecord connectRecord = queue.poll(remainingTime, 
TimeUnit.MILLISECONDS);
                 if (connectRecord == null) {
                     break;
                 }
                 connectRecords.add(connectRecord);
+
+                // calculate elapsed time and update remaining time for next 
poll
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                remainingTime = maxPollWaitTime > elapsedTime ? 
maxPollWaitTime - elapsedTime : 0;
             } catch (InterruptedException e) {
                 Thread currentThread = Thread.currentThread();
                 log.warn("[OpenFunctionSourceConnector] Interrupting thread {} 
due to exception {}",
diff --git 
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
index 836779dbc..4b5e4751b 100644
--- 
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
@@ -57,8 +57,6 @@ public class PravegaSourceConnector implements Source {
 
     private static final AtomicBoolean started = new AtomicBoolean(false);
 
-    private static final int DEFAULT_BATCH_SIZE = 10;
-
     private PravegaSourceConfig sourceConfig;
 
     private StreamManager streamManager;
@@ -71,6 +69,10 @@ public class PravegaSourceConnector implements Source {
 
     private BlockingQueue<CloudEvent> queue;
 
+    private int maxBatchSize;
+
+    private long maxPollWaitTime;
+
     private final ThreadPoolExecutor executor = 
ThreadPoolFactory.createThreadPoolExecutor(
         Runtime.getRuntime().availableProcessors() * 2,
         Runtime.getRuntime().availableProcessors() * 2,
@@ -89,7 +91,9 @@ public class PravegaSourceConnector implements Source {
     public void init(ConnectorContext connectorContext) throws Exception {
         SourceConnectorContext sourceConnectorContext = 
(SourceConnectorContext) connectorContext;
         this.sourceConfig = (PravegaSourceConfig) 
sourceConnectorContext.getSourceConfig();
-        this.queue = new LinkedBlockingQueue<>(1000);
+        this.queue = new 
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+        this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+        this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
 
         streamManager = 
StreamManager.create(sourceConfig.getConnectorConfig().getControllerURI());
         ClientConfig.ClientConfigBuilder clientConfigBuilder =
@@ -168,15 +172,21 @@ public class PravegaSourceConnector implements Source {
 
     @Override
     public List<ConnectRecord> poll() {
-        List<ConnectRecord> connectRecords = new 
ArrayList<>(DEFAULT_BATCH_SIZE);
-        for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+        long startTime = System.currentTimeMillis();
+        long remainingTime = maxPollWaitTime;
+
+        List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+        for (int count = 0; count < maxBatchSize; ++count) {
             try {
-                CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+                CloudEvent event = queue.poll(remainingTime, 
TimeUnit.MILLISECONDS);
                 if (event == null) {
                     break;
                 }
-
                 connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+
+                // calculate elapsed time and update remaining time for next 
poll
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                remainingTime = maxPollWaitTime > elapsedTime ? 
maxPollWaitTime - elapsedTime : 0;
             } catch (InterruptedException e) {
                 break;
             }
diff --git 
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
index 0b7e726bd..a19b159c1 100644
--- 
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
@@ -54,10 +54,12 @@ public class RabbitMQSourceConnector implements Source {
 
     private volatile boolean started = false;
 
-    private static final int DEFAULT_BATCH_SIZE = 10;
-
     private BlockingQueue<CloudEvent> queue;
 
+    private int maxBatchSize;
+
+    private long maxPollWaitTime;
+
     private final RabbitmqConnectionFactory rabbitmqConnectionFactory = new 
RabbitmqConnectionFactory();
 
     private RabbitMQSourceHandler rabbitMQSourceHandler;
@@ -84,7 +86,9 @@ public class RabbitMQSourceConnector implements Source {
 
     @Override
     public void init(ConnectorContext connectorContext) throws Exception {
-        this.queue = new LinkedBlockingQueue<>(1000);
+        this.queue = new 
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+        this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+        this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
         this.sourceConfig = (RabbitMQSourceConfig) ((SourceConnectorContext) 
connectorContext).getSourceConfig();
         this.rabbitmqClient = new RabbitmqClient(rabbitmqConnectionFactory);
         this.connection = 
rabbitmqClient.getConnection(sourceConfig.getConnectorConfig().getHost(),
@@ -139,15 +143,21 @@ public class RabbitMQSourceConnector implements Source {
 
     @Override
     public List<ConnectRecord> poll() {
-        List<ConnectRecord> connectRecords = new 
ArrayList<>(DEFAULT_BATCH_SIZE);
-        for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+        long startTime = System.currentTimeMillis();
+        long remainingTime = maxPollWaitTime;
+
+        List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+        for (int count = 0; count < maxBatchSize; ++count) {
             try {
-                CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+                CloudEvent event = queue.poll(remainingTime, 
TimeUnit.MILLISECONDS);
                 if (event == null) {
                     break;
                 }
-
                 connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+
+                // calculate elapsed time and update remaining time for next 
poll
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                remainingTime = maxPollWaitTime > elapsedTime ? 
maxPollWaitTime - elapsedTime : 0;
             } catch (InterruptedException e) {
                 break;
             }
diff --git 
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
index 868639c20..5b858afa3 100644
--- 
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
@@ -40,8 +40,6 @@ import io.cloudevents.CloudEvent;
 
 public class RedisSourceConnector implements Source {
 
-    private static final int DEFAULT_BATCH_SIZE = 10;
-
     private RTopic topic;
 
     private RedisSourceConfig sourceConfig;
@@ -50,6 +48,10 @@ public class RedisSourceConnector implements Source {
 
     private BlockingQueue<CloudEvent> queue;
 
+    private int maxBatchSize;
+
+    private long maxPollWaitTime;
+
     @Override
     public Class<? extends Config> configClass() {
         return RedisSourceConfig.class;
@@ -73,7 +75,9 @@ public class RedisSourceConnector implements Source {
         
redisConfig.useSingleServer().setAddress(sourceConfig.connectorConfig.getServer());
         redisConfig.setCodec(CloudEventCodec.getInstance());
         this.redissonClient = Redisson.create(redisConfig);
-        this.queue = new LinkedBlockingQueue<>(1000);
+        this.queue = new 
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+        this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+        this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
     }
 
     @Override
@@ -107,15 +111,21 @@ public class RedisSourceConnector implements Source {
 
     @Override
     public List<ConnectRecord> poll() {
-        List<ConnectRecord> connectRecords = new 
ArrayList<>(DEFAULT_BATCH_SIZE);
-        for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+        long startTime = System.currentTimeMillis();
+        long remainingTime = maxPollWaitTime;
+
+        List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+        for (int count = 0; count < maxBatchSize; ++count) {
             try {
-                CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+                CloudEvent event = queue.poll(remainingTime, 
TimeUnit.MILLISECONDS);
                 if (event == null) {
                     break;
                 }
-
                 connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+
+                // calculate elapsed time and update remaining time for next 
poll
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                remainingTime = maxPollWaitTime > elapsedTime ? 
maxPollWaitTime - elapsedTime : 0;
             } catch (InterruptedException e) {
                 break;
             }
diff --git 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
index db286eb60..6efed2db3 100644
--- 
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
@@ -52,14 +52,16 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations,
 
     private static final String CONNECTOR_PROPERTY_PREFIX = 
"eventmesh.connector.";
 
-    private static final int DEFAULT_BATCH_SIZE = 10;
-
     private ApplicationContext applicationContext;
 
     private SpringSourceConfig sourceConfig;
 
     private BlockingQueue<ConnectRecord> queue;
 
+    private int maxBatchSize;
+
+    private long maxPollWaitTime;
+
     @Override
     public Class<? extends Config> configClass() {
         return SpringSourceConfig.class;
@@ -69,7 +71,7 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations,
     public void init(Config config) throws Exception {
         // init config for spring source connector
         this.sourceConfig = (SpringSourceConfig) config;
-        this.queue = new LinkedBlockingQueue<>(1000);
+        doInit();
     }
 
     @Override
@@ -77,7 +79,13 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations,
         SourceConnectorContext sourceConnectorContext = 
(SourceConnectorContext) connectorContext;
         // init config for spring source connector
         this.sourceConfig = (SpringSourceConfig) 
sourceConnectorContext.getSourceConfig();
-        this.queue = new LinkedBlockingQueue<>(1000);
+        doInit();
+    }
+
+    private void doInit() {
+        this.queue = new 
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+        this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+        this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
     }
 
     @Override
@@ -107,15 +115,21 @@ public class SpringSourceConnector implements Source, 
MessageSendingOperations,
 
     @Override
     public List<ConnectRecord> poll() {
-        List<ConnectRecord> connectRecords = new 
ArrayList<>(DEFAULT_BATCH_SIZE);
+        long startTime = System.currentTimeMillis();
+        long remainingTime = maxPollWaitTime;
 
-        for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+        List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+        for (int count = 0; count < maxBatchSize; ++count) {
             try {
-                ConnectRecord connectRecord = queue.poll(3, TimeUnit.SECONDS);
+                ConnectRecord connectRecord = queue.poll(remainingTime, 
TimeUnit.MILLISECONDS);
                 if (connectRecord == null) {
                     break;
                 }
                 connectRecords.add(connectRecord);
+
+                // calculate elapsed time and update remaining time for next 
poll
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                remainingTime = maxPollWaitTime > elapsedTime ? 
maxPollWaitTime - elapsedTime : 0;
             } catch (InterruptedException e) {
                 Thread currentThread = Thread.currentThread();
                 log.warn("[SpringSourceConnector] Interrupting thread {} due 
to exception {}",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to