This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new da5fdbfdb [ISSUE #5101] Define and standardize some common
configurations for all Sources(#5102)
da5fdbfdb is described below
commit da5fdbfdbd18fb10cba7606d953e8ab2edabee56
Author: Zaki <[email protected]>
AuthorDate: Mon Oct 28 19:08:59 2024 +0800
[ISSUE #5101] Define and standardize some common configurations for all
Sources(#5102)
---
.../common/config/connector/Constants.java | 17 ++++++++
.../{SourceConfig.java => PollConfig.java} | 24 +++++++----
.../common/config/connector/SourceConfig.java | 3 ++
.../connector/http/SourceConnectorConfig.java | 8 +---
.../connector/mq/kafka/SourceConnectorConfig.java | 1 -
.../connector/CanalSourceCheckConnector.java | 8 +++-
.../source/connector/CanalSourceFullConnector.java | 7 +++-
.../source/connector/ChatGPTSourceConnector.java | 22 +++++++---
.../http/common/SynchronizedCircularFifoQueue.java | 1 -
.../connector/http/source/HttpSourceConnector.java | 49 +++++++++++++---------
.../connector/http/source/protocol/Protocol.java | 5 ++-
.../source/protocol/impl/CloudEventProtocol.java | 5 ++-
.../http/source/protocol/impl/CommonProtocol.java | 4 +-
.../http/source/protocol/impl/GitHubProtocol.java | 4 +-
.../src/main/resources/source-config.yml | 2 -
.../src/test/resources/source-config.yml | 2 -
.../connector/jdbc/source/JdbcSourceConnector.java | 9 ++--
.../jdbc/source/TaskManagerCoordinator.java | 29 ++++++++-----
.../source/connector/KafkaSourceConnector.java | 6 +--
.../source/connector/MongodbSourceConnector.java | 24 +++++++----
.../connector/OpenFunctionSourceConnector.java | 30 +++++++++----
.../source/connector/PravegaSourceConnector.java | 24 +++++++----
.../source/connector/RabbitMQSourceConnector.java | 24 +++++++----
.../source/connector/RedisSourceConnector.java | 24 +++++++----
.../source/connector/SpringSourceConnector.java | 28 +++++++++----
25 files changed, 241 insertions(+), 119 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java
index 74576e843..817efb6d3 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java
@@ -30,4 +30,21 @@ public class Constants {
public static final int DEFAULT_ATTEMPT = 3;
public static final int DEFAULT_PORT = 8080;
+
+ // ======================== Source Constants ========================
+ /**
+ * Default capacity
+ */
+ public static final int DEFAULT_CAPACITY = 1024;
+
+ /**
+ * Default poll batch size
+ */
+ public static final int DEFAULT_POLL_BATCH_SIZE = 10;
+
+ /**
+ * Default poll timeout (unit: ms)
+ */
+ public static final long DEFAULT_POLL_TIMEOUT = 5000L;
+
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/PollConfig.java
similarity index 69%
copy from
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
copy to
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/PollConfig.java
index 763063125..cf3f06be9 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/PollConfig.java
@@ -17,17 +17,27 @@
package org.apache.eventmesh.common.config.connector;
-import org.apache.eventmesh.common.config.connector.offset.OffsetStorageConfig;
-
import lombok.Data;
-import lombok.EqualsAndHashCode;
+/**
+ * Source Poll Config
+ */
@Data
-@EqualsAndHashCode(callSuper = true)
-public abstract class SourceConfig extends Config {
+public class PollConfig {
+
+ /**
+ * Capacity of the poll queue
+ */
+ private int capacity = Constants.DEFAULT_CAPACITY;
- private PubSubConfig pubSubConfig;
+ /**
+ * Max batch size of the poll
+ */
+ private int maxBatchSize = Constants.DEFAULT_POLL_BATCH_SIZE;
- private OffsetStorageConfig offsetStorageConfig;
+ /**
+ * Max wait time of the poll
+ */
+ private long maxWaitTime = Constants.DEFAULT_POLL_TIMEOUT;
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
index 763063125..f7bc42970 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java
@@ -30,4 +30,7 @@ public abstract class SourceConfig extends Config {
private OffsetStorageConfig offsetStorageConfig;
+ // Polling configuration, e.g. capacity, batch size, wait time, etc.
+ private PollConfig pollConfig = new PollConfig();
+
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
index 58d910bf2..282f88333 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
@@ -44,13 +44,7 @@ public class SourceConnectorConfig {
*/
private int maxFormAttributeSize = 1024 * 1024;
- // max size of the queue, default 1000
- private int maxStorageSize = 1000;
-
- // batch size, default 10
- private int batchSize = 10;
-
- // protocol, default CloudEvent
+ // protocol, default Common
private String protocol = "Common";
// extra config, e.g. GitHub secret
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java
index 21fb18eb2..eb7406f66 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java
@@ -32,5 +32,4 @@ public class SourceConnectorConfig {
private String enableAutoCommit = "false";
private String sessionTimeoutMS = "10000";
private String maxPollRecords = "1000";
- private int pollTimeOut = 100;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java
index 841c9a481..bd85f0324 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java
@@ -50,12 +50,14 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CanalSourceCheckConnector extends AbstractComponent implements
Source, ConnectorCreateService<Source> {
+
private CanalSourceFullConfig config;
private CanalFullPositionMgr positionMgr;
private RdbTableMgr tableMgr;
private ThreadPoolExecutor executor;
- private final BlockingQueue<List<ConnectRecord>> queue = new
LinkedBlockingQueue<>();
+ private BlockingQueue<List<ConnectRecord>> queue;
private final AtomicBoolean flag = new AtomicBoolean(true);
+ private long maxPollWaitTime;
@Override
protected void run() throws Exception {
@@ -140,6 +142,8 @@ public class CanalSourceCheckConnector extends
AbstractComponent implements Sour
DatabaseConnection.initSourceConnection();
this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(),
DatabaseConnection.sourceDataSource);
this.positionMgr = new CanalFullPositionMgr(config, tableMgr);
+ this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime();
+ this.queue = new
LinkedBlockingQueue<>(config.getPollConfig().getCapacity());
}
@Override
@@ -168,7 +172,7 @@ public class CanalSourceCheckConnector extends
AbstractComponent implements Sour
public List<ConnectRecord> poll() {
while (flag.get()) {
try {
- List<ConnectRecord> records = queue.poll(5, TimeUnit.SECONDS);
+ List<ConnectRecord> records = queue.poll(maxPollWaitTime,
TimeUnit.MILLISECONDS);
if (records == null || records.isEmpty()) {
continue;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
index c2632ee47..09e2e0dcf 100644
---
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
@@ -56,8 +56,9 @@ public class CanalSourceFullConnector extends
AbstractComponent implements Sourc
private CanalFullPositionMgr positionMgr;
private RdbTableMgr tableMgr;
private ThreadPoolExecutor executor;
- private final BlockingQueue<List<ConnectRecord>> queue = new
LinkedBlockingQueue<>();
+ private BlockingQueue<List<ConnectRecord>> queue;
private final AtomicBoolean flag = new AtomicBoolean(true);
+ private long maxPollWaitTime;
@Override
protected void run() throws Exception {
@@ -137,6 +138,8 @@ public class CanalSourceFullConnector extends
AbstractComponent implements Sourc
DatabaseConnection.initSourceConnection();
this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(),
DatabaseConnection.sourceDataSource);
this.positionMgr = new CanalFullPositionMgr(config, tableMgr);
+ this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime();
+ this.queue = new
LinkedBlockingQueue<>(config.getPollConfig().getCapacity());
}
@Override
@@ -166,7 +169,7 @@ public class CanalSourceFullConnector extends
AbstractComponent implements Sourc
public List<ConnectRecord> poll() {
while (flag.get()) {
try {
- List<ConnectRecord> records = queue.poll(5, TimeUnit.SECONDS);
+ List<ConnectRecord> records = queue.poll(maxPollWaitTime,
TimeUnit.MILLISECONDS);
if (records == null || records.isEmpty()) {
continue;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
index 6b122087e..1b6955feb 100644
---
a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java
@@ -61,8 +61,6 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ChatGPTSourceConnector implements Source {
- private static final int DEFAULT_BATCH_SIZE = 10;
-
private ChatGPTSourceConfig sourceConfig;
private BlockingQueue<CloudEvent> queue;
private HttpServer server;
@@ -79,6 +77,9 @@ public class ChatGPTSourceConnector implements Source {
private static final String APPLICATION_JSON = "application/json";
private static final String TEXT_PLAIN = "text/plain";
+ private int maxBatchSize;
+ private long maxPollWaitTime;
+
@Override
public Class<? extends Config> configClass() {
@@ -129,7 +130,9 @@ public class ChatGPTSourceConnector implements Source {
if (StringUtils.isNotEmpty(parsePromptTemplateStr)) {
this.parseHandler = new ParseHandler(openaiManager,
parsePromptTemplateStr);
}
- this.queue = new LinkedBlockingQueue<>(1024);
+ this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+ this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
+ this.queue = new
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
router.route().path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST).handler(BodyHandler.create()).handler(ctx
-> {
@@ -239,14 +242,21 @@ public class ChatGPTSourceConnector implements Source {
@Override
public List<ConnectRecord> poll() {
- List<ConnectRecord> connectRecords = new
ArrayList<>(DEFAULT_BATCH_SIZE);
- for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
+ long startTime = System.currentTimeMillis();
+ long remainingTime = maxPollWaitTime;
+
+ List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+ for (int i = 0; i < maxBatchSize; i++) {
try {
- CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+ CloudEvent event = queue.poll(remainingTime,
TimeUnit.MILLISECONDS);
if (event == null) {
break;
}
connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+
+ // calculate elapsed time and update remaining time for next
poll
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ remainingTime = maxPollWaitTime > elapsedTime ?
maxPollWaitTime - elapsedTime : 0;
} catch (InterruptedException e) {
break;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
index 0564e5873..9989552d1 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
@@ -142,7 +142,6 @@ public class SynchronizedCircularFifoQueue<E> extends
CircularFifoQueue<E> {
count++;
}
return items;
-
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
index 2b2a01a9d..6c78badaf 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
@@ -20,7 +20,6 @@ package org.apache.eventmesh.connector.http.source;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
-import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
@@ -30,8 +29,9 @@ import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -50,9 +50,11 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
private HttpSourceConfig sourceConfig;
- private SynchronizedCircularFifoQueue<Object> queue;
+ private BlockingQueue<Object> queue;
- private int batchSize;
+ private int maxBatchSize;
+
+ private long maxPollWaitTime;
private Route route;
@@ -92,11 +94,11 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
private void doInit() {
// init queue
- int maxQueueSize =
this.sourceConfig.getConnectorConfig().getMaxStorageSize();
- this.queue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
+ this.queue = new
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
- // init batch size
- this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();
+ // init poll batch size and timeout
+ this.maxBatchSize =
this.sourceConfig.getPollConfig().getMaxBatchSize();
+ this.maxPollWaitTime =
this.sourceConfig.getPollConfig().getMaxWaitTime();
// init protocol
String protocolName =
this.sourceConfig.getConnectorConfig().getProtocol();
@@ -183,20 +185,29 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
@Override
public List<ConnectRecord> poll() {
- // if queue is empty, return empty list
- if (queue.isEmpty()) {
- return Collections.emptyList();
- }
+ // record current time
+ long startTime = System.currentTimeMillis();
+ long remainingTime = maxPollWaitTime;
+
// poll from queue
- List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
- for (int i = 0; i < batchSize; i++) {
- Object obj = queue.poll();
- if (obj == null) {
+ List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+ for (int i = 0; i < maxBatchSize; i++) {
+ try {
+ Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS);
+ if (obj == null) {
+ break;
+ }
+ // convert to ConnectRecord
+ ConnectRecord connectRecord =
protocol.convertToConnectRecord(obj);
+ connectRecords.add(connectRecord);
+
+ // calculate elapsed time and update remaining time for next
poll
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ remainingTime = maxPollWaitTime > elapsedTime ?
maxPollWaitTime - elapsedTime : 0;
+ } catch (Exception e) {
+ log.error("Failed to poll from queue.", e);
break;
}
- // convert to ConnectRecord
- ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
- connectRecords.add(connectRecord);
}
return connectRecords;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
index b671383e5..c5a22139e 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
@@ -18,9 +18,10 @@
package org.apache.eventmesh.connector.http.source.protocol;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
-import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import java.util.concurrent.BlockingQueue;
+
import io.vertx.ext.web.Route;
@@ -45,7 +46,7 @@ public interface Protocol {
* @param route route
* @param queue queue info
*/
- void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue);
+ void setHandler(Route route, BlockingQueue<Object> queue);
/**
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
index 4906e920f..a44ed0e90 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
@@ -18,12 +18,13 @@
package org.apache.eventmesh.connector.http.source.protocol.impl;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
-import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.CloudEventUtil;
+import java.util.concurrent.BlockingQueue;
+
import io.cloudevents.CloudEvent;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -60,7 +61,7 @@ public class CloudEventProtocol implements Protocol {
* @param queue queue info
*/
@Override
- public void setHandler(Route route, SynchronizedCircularFifoQueue<Object>
queue) {
+ public void setHandler(Route route, BlockingQueue<Object> queue) {
route.method(HttpMethod.POST)
.handler(ctx -> VertxMessageFactory.createReader(ctx.request())
.map(reader -> {
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
index 0761170ac..e831dc972 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
@@ -20,7 +20,6 @@ package
org.apache.eventmesh.connector.http.source.protocol.impl;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.common.utils.JsonUtils;
-import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
@@ -28,6 +27,7 @@ import
org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.util.Base64;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -66,7 +66,7 @@ public class CommonProtocol implements Protocol {
* @param queue queue info
*/
@Override
- public void setHandler(Route route, SynchronizedCircularFifoQueue<Object>
queue) {
+ public void setHandler(Route route, BlockingQueue<Object> queue) {
route.method(HttpMethod.POST)
.handler(BodyHandler.create())
.handler(ctx -> {
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
index fac8c0d80..e1edbd0fa 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
@@ -20,7 +20,6 @@ package
org.apache.eventmesh.connector.http.source.protocol.impl;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
-import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
@@ -31,6 +30,7 @@ import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import javax.crypto.Mac;
@@ -90,7 +90,7 @@ public class GitHubProtocol implements Protocol {
* @param queue queue info
*/
@Override
- public void setHandler(Route route, SynchronizedCircularFifoQueue<Object>
queue) {
+ public void setHandler(Route route, BlockingQueue<Object> queue) {
route.method(HttpMethod.POST)
.handler(BodyHandler.create())
.handler(ctx -> {
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
index b1edc084f..0a73e627b 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
@@ -30,8 +30,6 @@ connectorConfig:
port: 3755
idleTimeout: 5000 # timeunit: ms
maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB).
This applies only when handling form data submissions.
- maxStorageSize: 1000 # max storage size, default: 1000
- batchSize: 10 # batch size, default: 10
protocol: CloudEvent # Case insensitive, default: CloudEvent, options:
CloudEvent, GitHub, Common
extraConfig: # extra config for different protocol, e.g. GitHub secret
secret: xxxxxxx # GitHub secret
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
index 735d3b01d..336bb2cb5 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
+++
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
@@ -30,8 +30,6 @@ connectorConfig:
port: 3755
idleTimeout: 5000 # timeunit: ms
maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB).
This applies only when handling form data submissions.
- maxStorageSize: 1000 # max storage size, default: 1000
- batchSize: 10 # batch size, default: 10
protocol: CloudEvent # Case insensitive, default: CloudEvent, options:
CloudEvent, GitHub, Common
extraConfig: # extra config for different protocol, e.g. GitHub secret
secret: xxxxxxx # GitHub secret
diff --git
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
index 810a59e72..ecc5a4415 100644
---
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java
@@ -142,7 +142,9 @@ public class JdbcSourceConnector extends SourceConnector {
this.dispatcher = new EventDispatcher(this.sourceJdbcTaskManager);
- this.taskManagerCoordinator = new TaskManagerCoordinator();
+ this.taskManagerCoordinator = new
TaskManagerCoordinator(sourceConfig.getPollConfig().getCapacity(),
+ sourceConfig.getPollConfig().getMaxBatchSize(),
+ sourceConfig.getPollConfig().getMaxWaitTime());
this.taskManagerCoordinator.registerTaskManager(SourceJdbcTaskManager.class.getName(),
sourceJdbcTaskManager);
this.taskManagerCoordinator.init();
}
@@ -209,9 +211,6 @@ public class JdbcSourceConnector extends SourceConnector {
@Override
public List<ConnectRecord> poll() {
-
- List<ConnectRecord> connectRecords =
this.taskManagerCoordinator.poll();
-
- return connectRecords;
+ return this.taskManagerCoordinator.poll();
}
}
diff --git
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
index c299fbc53..8efb8cbc7 100644
---
a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
+++
b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java
@@ -40,16 +40,16 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TaskManagerCoordinator {
- private static final int BATCH_MAX = 10;
- private static final int DEFAULT_QUEUE_SIZE = 1 << 13;
+ private final BlockingQueue<ConnectRecord> recordBlockingQueue;
+ private final Map<String, JdbcTaskManager> taskManagerCache = new
HashMap<>(8);
+ private final int maxBatchSize;
+ private final long maxPollTimeout;
- private BlockingQueue<ConnectRecord> recordBlockingQueue = new
LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
- private Map<String, JdbcTaskManager> taskManagerCache = new HashMap<>(8);
- /**
- * Constructs a new TaskManagerCoordinator.
- */
- public TaskManagerCoordinator() {
+ public TaskManagerCoordinator(int capacity, int maxBatchSize, long
maxPollTimeout) {
+ this.recordBlockingQueue = new LinkedBlockingQueue<>(capacity);
+ this.maxBatchSize = maxBatchSize;
+ this.maxPollTimeout = maxPollTimeout;
}
/**
@@ -96,10 +96,13 @@ public class TaskManagerCoordinator {
* @return A list of ConnectRecords, up to the maximum batch size defined
by BATCH_MAX.
*/
public List<ConnectRecord> poll() {
- List<ConnectRecord> records = new ArrayList<>(BATCH_MAX);
- for (int index = 0; index < BATCH_MAX; ++index) {
+ long startTime = System.currentTimeMillis();
+ long remainingTime = maxPollTimeout;
+
+ List<ConnectRecord> records = new ArrayList<>(maxBatchSize);
+ for (int index = 0; index < maxBatchSize; ++index) {
try {
- ConnectRecord record = recordBlockingQueue.poll(3,
TimeUnit.SECONDS);
+ ConnectRecord record = recordBlockingQueue.poll(remainingTime,
TimeUnit.MILLISECONDS);
if (Objects.isNull(record)) {
break;
}
@@ -107,6 +110,10 @@ public class TaskManagerCoordinator {
log.debug("record:{}", JsonUtils.toJSONString(record));
}
records.add(record);
+
+ // calculate elapsed time and update remaining time for next
poll
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ remainingTime = maxPollTimeout > elapsedTime ? maxPollTimeout
- elapsedTime : 0;
} catch (InterruptedException e) {
break;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
index d57312693..f771e907c 100644
---
a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java
@@ -45,7 +45,7 @@ public class KafkaSourceConnector implements Source {
private KafkaConsumer<String, String> kafkaConsumer;
- private int pollTimeOut = 100;
+ private long maxPollWaitTime;
@Override
public Class<? extends Config> configClass() {
@@ -75,7 +75,7 @@ public class KafkaSourceConnector implements Source {
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
sourceConfig.getConnectorConfig().getMaxPollRecords());
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
sourceConfig.getConnectorConfig().getAutoCommitIntervalMS());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
sourceConfig.getConnectorConfig().getSessionTimeoutMS());
- this.pollTimeOut = sourceConfig.getConnectorConfig().getPollTimeOut();
+ this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
this.kafkaConsumer = new KafkaConsumer<>(props);
}
@@ -106,7 +106,7 @@ public class KafkaSourceConnector implements Source {
@Override
public List<ConnectRecord> poll() {
- ConsumerRecords<String, String> records =
kafkaConsumer.poll(Duration.ofMillis(pollTimeOut));
+ ConsumerRecords<String, String> records =
kafkaConsumer.poll(Duration.ofMillis(maxPollWaitTime));
List<ConnectRecord> connectRecords = new ArrayList<>(records.count());
for (ConsumerRecord<String, String> record : records) {
Long timestamp = System.currentTimeMillis();
diff --git
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
index df3f66d6a..1d1dcc184 100644
---
a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java
@@ -42,10 +42,12 @@ public class MongodbSourceConnector implements Source {
private MongodbSourceConfig sourceConfig;
- private static final int DEFAULT_BATCH_SIZE = 10;
-
private BlockingQueue<CloudEvent> queue;
+ private int maxBatchSize;
+
+ private long maxPollWaitTime;
+
private MongodbSourceClient client;
@Override
@@ -67,7 +69,9 @@ public class MongodbSourceConnector implements Source {
}
private void doInit() {
- this.queue = new LinkedBlockingQueue<>(1000);
+ this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+ this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
+ this.queue = new
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
String connectorType =
sourceConfig.getConnectorConfig().getConnectorType();
if (connectorType.equals(ClusterType.STANDALONE.name())) {
this.client = new
MongodbStandaloneSourceClient(sourceConfig.getConnectorConfig(), queue);
@@ -105,15 +109,21 @@ public class MongodbSourceConnector implements Source {
@Override
public List<ConnectRecord> poll() {
- List<ConnectRecord> connectRecords = new
ArrayList<>(DEFAULT_BATCH_SIZE);
- for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+ long startTime = System.currentTimeMillis();
+ long remainingTime = maxPollWaitTime;
+
+ List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+ for (int count = 0; count < maxBatchSize; ++count) {
try {
- CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+ CloudEvent event = queue.poll(remainingTime,
TimeUnit.MILLISECONDS);
if (event == null) {
break;
}
-
connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+
+ // calculate elapsed time and update remaining time for next
poll
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ remainingTime = maxPollWaitTime > elapsedTime ?
maxPollWaitTime - elapsedTime : 0;
} catch (InterruptedException e) {
break;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
index 534ecfb79..e40c451ff 100644
---
a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java
@@ -35,12 +35,14 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class OpenFunctionSourceConnector implements Source {
- private static final int DEFAULT_BATCH_SIZE = 10;
-
private OpenFunctionSourceConfig sourceConfig;
private BlockingQueue<ConnectRecord> queue;
+ private int maxBatchSize;
+
+ private long maxPollWaitTime;
+
@Override
public Class<? extends Config> configClass() {
return OpenFunctionSourceConfig.class;
@@ -50,7 +52,7 @@ public class OpenFunctionSourceConnector implements Source {
public void init(Config config) throws Exception {
// init config for openfunction source connector
this.sourceConfig = (OpenFunctionSourceConfig) config;
- this.queue = new LinkedBlockingQueue<>(1000);
+ doInit();
}
@Override
@@ -58,7 +60,14 @@ public class OpenFunctionSourceConnector implements Source {
SourceConnectorContext sourceConnectorContext =
(SourceConnectorContext) connectorContext;
// init config for openfunction source connector
this.sourceConfig = (OpenFunctionSourceConfig)
sourceConnectorContext.getSourceConfig();
- this.queue = new LinkedBlockingQueue<>(1000);
+ doInit();
+ }
+
+ private void doInit() {
+ // init config for openfunction source connector
+ this.queue = new
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+ this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+ this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
}
@Override
@@ -92,16 +101,21 @@ public class OpenFunctionSourceConnector implements Source
{
@Override
public List<ConnectRecord> poll() {
+ long startTime = System.currentTimeMillis();
+ long remainingTime = maxPollWaitTime;
- List<ConnectRecord> connectRecords = new
ArrayList<>(DEFAULT_BATCH_SIZE);
-
- for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+ List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+ for (int count = 0; count < maxBatchSize; ++count) {
try {
- ConnectRecord connectRecord = queue.poll(3, TimeUnit.SECONDS);
+ ConnectRecord connectRecord = queue.poll(remainingTime,
TimeUnit.MILLISECONDS);
if (connectRecord == null) {
break;
}
connectRecords.add(connectRecord);
+
+ // calculate elapsed time and update remaining time for next
poll
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ remainingTime = maxPollWaitTime > elapsedTime ?
maxPollWaitTime - elapsedTime : 0;
} catch (InterruptedException e) {
Thread currentThread = Thread.currentThread();
log.warn("[OpenFunctionSourceConnector] Interrupting thread {}
due to exception {}",
diff --git
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
index 836779dbc..4b5e4751b 100644
---
a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java
@@ -57,8 +57,6 @@ public class PravegaSourceConnector implements Source {
private static final AtomicBoolean started = new AtomicBoolean(false);
- private static final int DEFAULT_BATCH_SIZE = 10;
-
private PravegaSourceConfig sourceConfig;
private StreamManager streamManager;
@@ -71,6 +69,10 @@ public class PravegaSourceConnector implements Source {
private BlockingQueue<CloudEvent> queue;
+ private int maxBatchSize;
+
+ private long maxPollWaitTime;
+
private final ThreadPoolExecutor executor =
ThreadPoolFactory.createThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 2,
@@ -89,7 +91,9 @@ public class PravegaSourceConnector implements Source {
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext =
(SourceConnectorContext) connectorContext;
this.sourceConfig = (PravegaSourceConfig)
sourceConnectorContext.getSourceConfig();
- this.queue = new LinkedBlockingQueue<>(1000);
+ this.queue = new
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+ this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+ this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
streamManager =
StreamManager.create(sourceConfig.getConnectorConfig().getControllerURI());
ClientConfig.ClientConfigBuilder clientConfigBuilder =
@@ -168,15 +172,21 @@ public class PravegaSourceConnector implements Source {
@Override
public List<ConnectRecord> poll() {
- List<ConnectRecord> connectRecords = new
ArrayList<>(DEFAULT_BATCH_SIZE);
- for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+ long startTime = System.currentTimeMillis();
+ long remainingTime = maxPollWaitTime;
+
+ List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+ for (int count = 0; count < maxBatchSize; ++count) {
try {
- CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+ CloudEvent event = queue.poll(remainingTime,
TimeUnit.MILLISECONDS);
if (event == null) {
break;
}
-
connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+
+ // calculate elapsed time and update remaining time for next
poll
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ remainingTime = maxPollWaitTime > elapsedTime ?
maxPollWaitTime - elapsedTime : 0;
} catch (InterruptedException e) {
break;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
index 0b7e726bd..a19b159c1 100644
---
a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java
@@ -54,10 +54,12 @@ public class RabbitMQSourceConnector implements Source {
private volatile boolean started = false;
- private static final int DEFAULT_BATCH_SIZE = 10;
-
private BlockingQueue<CloudEvent> queue;
+ private int maxBatchSize;
+
+ private long maxPollWaitTime;
+
private final RabbitmqConnectionFactory rabbitmqConnectionFactory = new
RabbitmqConnectionFactory();
private RabbitMQSourceHandler rabbitMQSourceHandler;
@@ -84,7 +86,9 @@ public class RabbitMQSourceConnector implements Source {
@Override
public void init(ConnectorContext connectorContext) throws Exception {
- this.queue = new LinkedBlockingQueue<>(1000);
+ this.queue = new
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+ this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+ this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
this.sourceConfig = (RabbitMQSourceConfig) ((SourceConnectorContext)
connectorContext).getSourceConfig();
this.rabbitmqClient = new RabbitmqClient(rabbitmqConnectionFactory);
this.connection =
rabbitmqClient.getConnection(sourceConfig.getConnectorConfig().getHost(),
@@ -139,15 +143,21 @@ public class RabbitMQSourceConnector implements Source {
@Override
public List<ConnectRecord> poll() {
- List<ConnectRecord> connectRecords = new
ArrayList<>(DEFAULT_BATCH_SIZE);
- for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+ long startTime = System.currentTimeMillis();
+ long remainingTime = maxPollWaitTime;
+
+ List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+ for (int count = 0; count < maxBatchSize; ++count) {
try {
- CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+ CloudEvent event = queue.poll(remainingTime,
TimeUnit.MILLISECONDS);
if (event == null) {
break;
}
-
connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+
+ // calculate elapsed time and update remaining time for next
poll
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ remainingTime = maxPollWaitTime > elapsedTime ?
maxPollWaitTime - elapsedTime : 0;
} catch (InterruptedException e) {
break;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
index 868639c20..5b858afa3 100644
---
a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java
@@ -40,8 +40,6 @@ import io.cloudevents.CloudEvent;
public class RedisSourceConnector implements Source {
- private static final int DEFAULT_BATCH_SIZE = 10;
-
private RTopic topic;
private RedisSourceConfig sourceConfig;
@@ -50,6 +48,10 @@ public class RedisSourceConnector implements Source {
private BlockingQueue<CloudEvent> queue;
+ private int maxBatchSize;
+
+ private long maxPollWaitTime;
+
@Override
public Class<? extends Config> configClass() {
return RedisSourceConfig.class;
@@ -73,7 +75,9 @@ public class RedisSourceConnector implements Source {
redisConfig.useSingleServer().setAddress(sourceConfig.connectorConfig.getServer());
redisConfig.setCodec(CloudEventCodec.getInstance());
this.redissonClient = Redisson.create(redisConfig);
- this.queue = new LinkedBlockingQueue<>(1000);
+ this.queue = new
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+ this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+ this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
}
@Override
@@ -107,15 +111,21 @@ public class RedisSourceConnector implements Source {
@Override
public List<ConnectRecord> poll() {
- List<ConnectRecord> connectRecords = new
ArrayList<>(DEFAULT_BATCH_SIZE);
- for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+ long startTime = System.currentTimeMillis();
+ long remainingTime = maxPollWaitTime;
+
+ List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+ for (int count = 0; count < maxBatchSize; ++count) {
try {
- CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+ CloudEvent event = queue.poll(remainingTime,
TimeUnit.MILLISECONDS);
if (event == null) {
break;
}
-
connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+
+ // calculate elapsed time and update remaining time for next
poll
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ remainingTime = maxPollWaitTime > elapsedTime ?
maxPollWaitTime - elapsedTime : 0;
} catch (InterruptedException e) {
break;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
index db286eb60..6efed2db3 100644
---
a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java
@@ -52,14 +52,16 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations,
private static final String CONNECTOR_PROPERTY_PREFIX =
"eventmesh.connector.";
- private static final int DEFAULT_BATCH_SIZE = 10;
-
private ApplicationContext applicationContext;
private SpringSourceConfig sourceConfig;
private BlockingQueue<ConnectRecord> queue;
+ private int maxBatchSize;
+
+ private long maxPollWaitTime;
+
@Override
public Class<? extends Config> configClass() {
return SpringSourceConfig.class;
@@ -69,7 +71,7 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations,
public void init(Config config) throws Exception {
// init config for spring source connector
this.sourceConfig = (SpringSourceConfig) config;
- this.queue = new LinkedBlockingQueue<>(1000);
+ doInit();
}
@Override
@@ -77,7 +79,13 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations,
SourceConnectorContext sourceConnectorContext =
(SourceConnectorContext) connectorContext;
// init config for spring source connector
this.sourceConfig = (SpringSourceConfig)
sourceConnectorContext.getSourceConfig();
- this.queue = new LinkedBlockingQueue<>(1000);
+ doInit();
+ }
+
+ private void doInit() {
+ this.queue = new
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+ this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
+ this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
}
@Override
@@ -107,15 +115,21 @@ public class SpringSourceConnector implements Source,
MessageSendingOperations,
@Override
public List<ConnectRecord> poll() {
- List<ConnectRecord> connectRecords = new
ArrayList<>(DEFAULT_BATCH_SIZE);
+ long startTime = System.currentTimeMillis();
+ long remainingTime = maxPollWaitTime;
- for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
+ List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
+ for (int count = 0; count < maxBatchSize; ++count) {
try {
- ConnectRecord connectRecord = queue.poll(3, TimeUnit.SECONDS);
+ ConnectRecord connectRecord = queue.poll(remainingTime,
TimeUnit.MILLISECONDS);
if (connectRecord == null) {
break;
}
connectRecords.add(connectRecord);
+
+ // calculate elapsed time and update remaining time for next
poll
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ remainingTime = maxPollWaitTime > elapsedTime ?
maxPollWaitTime - elapsedTime : 0;
} catch (InterruptedException e) {
Thread currentThread = Thread.currentThread();
log.warn("[SpringSourceConnector] Interrupting thread {} due
to exception {}",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]