This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new b72d4f8fb [ISSUE #5144] update eventmesh-connector-http module (#5145)
b72d4f8fb is described below

commit b72d4f8fba023298be89c020469e2bdad758c3ec
Author: mike_xwm <[email protected]>
AuthorDate: Wed Dec 11 22:14:59 2024 +0800

    [ISSUE #5144] update eventmesh-connector-http module (#5145)
    
    * [ISSUE #5137] update connector runtime v2 module
    
    * fix checkStyle error
    
    * [ISSUE #5139] update canal connector module
    
    * [ISSUE #5141] update eventmesh-admin-server module
    
    * [ISSUE #5144] update eventmesh-connector-http module
---
 .../config/connector/http/SinkConnectorConfig.java | 13 ++++-
 .../connector/http/SourceConnectorConfig.java      | 10 +++-
 .../http/common/SynchronizedCircularFifoQueue.java |  1 +
 .../connector/http/sink/HttpSinkConnector.java     | 44 ++++++++++++++-
 .../http/sink/data/HttpExportMetadata.java         |  2 -
 .../http/sink/handler/AbstractHttpSinkHandler.java | 64 ++++++++++++++--------
 .../HttpDeliveryStrategy.java}                     | 35 ++----------
 .../sink/handler/impl/CommonHttpSinkHandler.java   |  8 ++-
 .../connector/http/source/HttpSourceConnector.java | 38 +++++++------
 .../source/protocol/impl/CloudEventProtocol.java   |  2 +-
 10 files changed, 134 insertions(+), 83 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
index ccebe5a99..65fc8fe72 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
@@ -39,8 +39,8 @@ public class SinkConnectorConfig {
     // timeunit: ms, default 5000ms
     private int idleTimeout = 5000;
 
-    // maximum number of HTTP/1 connections a client will pool, default 5
-    private int maxConnectionPoolSize = 5;
+    // maximum number of HTTP/1 connections a client will pool, default 50
+    private int maxConnectionPoolSize = 50;
 
     // retry config
     private HttpRetryConfig retryConfig = new HttpRetryConfig();
@@ -48,6 +48,15 @@ public class SinkConnectorConfig {
     // webhook config
     private HttpWebhookConfig webhookConfig = new HttpWebhookConfig();
 
+    private String deliveryStrategy = "ROUND_ROBIN";
+
+    private boolean skipDeliverException = false;
+
+    // managed pipelining param, default true
+    private boolean isParallelized = true;
+
+    private int parallelism = 2;
+
 
     /**
      * Fill default values if absent (When there are multiple default values 
for a field)
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
index 282f88333..2c091e321 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
@@ -44,12 +44,18 @@ public class SourceConnectorConfig {
      */
     private int maxFormAttributeSize = 1024 * 1024;
 
-    // protocol, default Common
+    // max size of the queue, default 1000
+    private int maxStorageSize = 1000;
+
+    // batch size, default 10
+    private int batchSize = 10;
+
+    // protocol, default CloudEvent
     private String protocol = "Common";
 
     // extra config, e.g. GitHub secret
     private Map<String, String> extraConfig = new HashMap<>();
 
     // data consistency enabled, default true
-    private boolean dataConsistencyEnabled = false;
+    private boolean dataConsistencyEnabled = true;
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
index 9989552d1..0564e5873 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
@@ -142,6 +142,7 @@ public class SynchronizedCircularFifoQueue<E> extends 
CircularFifoQueue<E> {
             count++;
         }
         return items;
+
     }
 
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
index 3df110f2e..8e808ccc9 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.connector.http.sink;
 
+import org.apache.eventmesh.common.EventMeshThreadFactory;
 import org.apache.eventmesh.common.config.connector.Config;
 import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig;
 import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
@@ -32,6 +33,10 @@ import 
org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.Getter;
 import lombok.SneakyThrows;
@@ -45,6 +50,12 @@ public class HttpSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
     @Getter
     private HttpSinkHandler sinkHandler;
 
+    private ThreadPoolExecutor executor;
+
+    private final LinkedBlockingQueue<ConnectRecord> queue = new 
LinkedBlockingQueue<>(10000);
+
+    private final AtomicBoolean isStart = new AtomicBoolean(true);
+
     @Override
     public Class<? extends Config> configClass() {
         return HttpSinkConfig.class;
@@ -90,11 +101,30 @@ public class HttpSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
         } else {
             throw new IllegalArgumentException("Max retries must be greater 
than or equal to 0.");
         }
+        boolean isParallelized = 
this.httpSinkConfig.connectorConfig.isParallelized();
+        int parallelism = isParallelized ? 
this.httpSinkConfig.connectorConfig.getParallelism() : 1;
+        executor = new ThreadPoolExecutor(parallelism, parallelism, 0L, 
TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(), new 
EventMeshThreadFactory("http-sink-handler"));
     }
 
     @Override
     public void start() throws Exception {
         this.sinkHandler.start();
+        for (int i = 0; i < 
this.httpSinkConfig.connectorConfig.getParallelism(); i++) {
+            executor.execute(() -> {
+                while (isStart.get()) {
+                    ConnectRecord connectRecord = null;
+                    try {
+                        connectRecord = queue.poll(2, TimeUnit.SECONDS);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                    if (connectRecord != null) {
+                        sinkHandler.handle(connectRecord);
+                    }
+                }
+            });
+        }
     }
 
     @Override
@@ -114,7 +144,18 @@ public class HttpSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
 
     @Override
     public void stop() throws Exception {
+        isStart.set(false);
+        while (!queue.isEmpty()) {
+            ConnectRecord record = queue.poll();
+            this.sinkHandler.handle(record);
+        }
+        try {
+            Thread.sleep(50);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
         this.sinkHandler.stop();
+        log.info("All tasks completed, start shut down http sink connector");
     }
 
     @Override
@@ -125,8 +166,7 @@ public class HttpSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
                     log.warn("ConnectRecord data is null, ignore.");
                     continue;
                 }
-                // Handle the ConnectRecord
-                this.sinkHandler.handle(sinkRecord);
+                queue.put(sinkRecord);
             } catch (Exception e) {
                 log.error("Failed to sink message via HTTP. ", e);
             }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
index 41a508787..111ee6b3e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
@@ -40,8 +40,6 @@ public class HttpExportMetadata implements Serializable {
 
     private LocalDateTime receivedTime;
 
-    private String httpRecordId;
-
     private String recordId;
 
     private String retriedBy;
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
index 28ba79112..9ef760617 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
@@ -30,17 +30,26 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import lombok.Getter;
+
 /**
  * AbstractHttpSinkHandler is an abstract class that provides a base 
implementation for HttpSinkHandler.
  */
 public abstract class AbstractHttpSinkHandler implements HttpSinkHandler {
 
+    @Getter
     private final SinkConnectorConfig sinkConnectorConfig;
 
+    @Getter
     private final List<URI> urls;
 
+    private final HttpDeliveryStrategy deliveryStrategy;
+
+    private int roundRobinIndex = 0;
+
     protected AbstractHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) 
{
         this.sinkConnectorConfig = sinkConnectorConfig;
+        this.deliveryStrategy = 
HttpDeliveryStrategy.valueOf(sinkConnectorConfig.getDeliveryStrategy());
         // Initialize URLs
         String[] urlStrings = sinkConnectorConfig.getUrls();
         this.urls = Arrays.stream(urlStrings)
@@ -48,14 +57,6 @@ public abstract class AbstractHttpSinkHandler implements 
HttpSinkHandler {
             .collect(Collectors.toList());
     }
 
-    public SinkConnectorConfig getSinkConnectorConfig() {
-        return sinkConnectorConfig;
-    }
-
-    public List<URI> getUrls() {
-        return urls;
-    }
-
     /**
      * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method 
should be called for each ConnectRecord that needs to be processed.
      *
@@ -65,23 +66,38 @@ public abstract class AbstractHttpSinkHandler implements 
HttpSinkHandler {
     public void handle(ConnectRecord record) {
         // build attributes
         Map<String, Object> attributes = new ConcurrentHashMap<>();
-        attributes.put(MultiHttpRequestContext.NAME, new 
MultiHttpRequestContext(urls.size()));
-
-        // send the record to all URLs
-        for (URI url : urls) {
-            // convert ConnectRecord to HttpConnectRecord
-            String type = String.format("%s.%s.%s",
-                this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
-                this.sinkConnectorConfig.getWebhookConfig().isActivate() ? 
"webhook" : "common");
-            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
-
-            // add AttemptEvent to the attributes
-            HttpAttemptEvent attemptEvent = new 
HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
-            attributes.put(HttpAttemptEvent.PREFIX + 
httpConnectRecord.getHttpRecordId(), attemptEvent);
-
-            // deliver the record
-            deliver(url, httpConnectRecord, attributes, record);
+
+        switch (deliveryStrategy) {
+            case ROUND_ROBIN:
+                attributes.put(MultiHttpRequestContext.NAME, new 
MultiHttpRequestContext(1));
+                URI url = urls.get(roundRobinIndex);
+                roundRobinIndex = (roundRobinIndex + 1) % urls.size();
+                sendRecordToUrl(record, attributes, url);
+                break;
+            case BROADCAST:
+                for (URI broadcastUrl : urls) {
+                    attributes.put(MultiHttpRequestContext.NAME, new 
MultiHttpRequestContext(urls.size()));
+                    sendRecordToUrl(record, attributes, broadcastUrl);
+                }
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown delivery strategy: 
" + deliveryStrategy);
         }
     }
 
+    private void sendRecordToUrl(ConnectRecord record, Map<String, Object> 
attributes, URI url) {
+        // convert ConnectRecord to HttpConnectRecord
+        String type = String.format("%s.%s.%s",
+            this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
+            this.sinkConnectorConfig.getWebhookConfig().isActivate() ? 
"webhook" : "common");
+        HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
+
+        // add AttemptEvent to the attributes
+        HttpAttemptEvent attemptEvent = new 
HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
+        attributes.put(HttpAttemptEvent.PREFIX + 
httpConnectRecord.getHttpRecordId(), attemptEvent);
+
+        // deliver the record
+        deliver(url, httpConnectRecord, attributes, record);
+    }
+
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpDeliveryStrategy.java
similarity index 57%
copy from 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
copy to 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpDeliveryStrategy.java
index 41a508787..2e770eb12 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpDeliveryStrategy.java
@@ -15,36 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.http.sink.data;
+package org.apache.eventmesh.connector.http.sink.handler;
 
-import java.io.Serializable;
-import java.time.LocalDateTime;
-
-import lombok.Builder;
-import lombok.Data;
-
-/**
- * Metadata for an HTTP export operation.
- */
-@Data
-@Builder
-public class HttpExportMetadata implements Serializable {
-
-    private static final long serialVersionUID = 1121010466793041920L;
-
-    private String url;
-
-    private int code;
-
-    private String message;
-
-    private LocalDateTime receivedTime;
-
-    private String httpRecordId;
-
-    private String recordId;
-
-    private String retriedBy;
-
-    private int retryNum;
+public enum HttpDeliveryStrategy {
+    ROUND_ROBIN,
+    BROADCAST
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
index 61bdc9f31..0b57cc06e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
@@ -93,7 +93,8 @@ public class CommonHttpSinkHandler extends 
AbstractHttpSinkHandler {
             .setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
             .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
             .setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
-            .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize());
+            .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize())
+            .setPipelining(sinkConnectorConfig.isParallelized());
         this.webClient = WebClient.create(vertx, options);
     }
 
@@ -108,7 +109,7 @@ public class CommonHttpSinkHandler extends 
AbstractHttpSinkHandler {
      */
     @Override
     public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes,
-        ConnectRecord connectRecord) {
+                                                ConnectRecord connectRecord) {
         // create headers
         Map<String, Object> extensionMap = new HashMap<>();
         Set<String> extensionKeySet = 
httpConnectRecord.getExtensions().keySet();
@@ -203,6 +204,9 @@ public class CommonHttpSinkHandler extends 
AbstractHttpSinkHandler {
                 // failure
                 
record.getCallback().onException(buildSendExceptionContext(record, 
lastFailedEvent.getLastException()));
             }
+        } else {
+            log.warn("still have requests to process, size {}|attempt num {}",
+                multiHttpRequestContext.getRemainingRequests(), 
attemptEvent.getAttempts());
         }
     }
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
index 6c78badaf..5aa8c63dd 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.connector.http.source;
 import org.apache.eventmesh.common.config.connector.Config;
 import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
 import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.http.source.data.CommonResponse;
 import org.apache.eventmesh.connector.http.source.protocol.Protocol;
 import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory;
 import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
@@ -40,6 +41,7 @@ import io.vertx.core.http.HttpServer;
 import io.vertx.core.http.HttpServerOptions;
 import io.vertx.ext.web.Route;
 import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
 import io.vertx.ext.web.handler.LoggerHandler;
 
 import lombok.Getter;
@@ -52,9 +54,7 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
     private BlockingQueue<Object> queue;
 
-    private int maxBatchSize;
-
-    private long maxPollWaitTime;
+    private int batchSize;
 
     private Route route;
 
@@ -94,11 +94,11 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
     private void doInit() {
         // init queue
-        this.queue = new 
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+        int maxQueueSize = 
this.sourceConfig.getConnectorConfig().getMaxStorageSize();
+        this.queue = new LinkedBlockingQueue<>(maxQueueSize);
 
-        // init poll batch size and timeout
-        this.maxBatchSize = 
this.sourceConfig.getPollConfig().getMaxBatchSize();
-        this.maxPollWaitTime = 
this.sourceConfig.getPollConfig().getMaxWaitTime();
+        // init batch size
+        this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();
 
         // init protocol
         String protocolName = 
this.sourceConfig.getConnectorConfig().getProtocol();
@@ -136,14 +136,17 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
     @Override
     public void commit(ConnectRecord record) {
-        if (this.route != null && 
sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
-            this.route.handler(ctx -> {
-                // Return 200 OK
-                ctx.response()
+        if (sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
+            log.debug("HttpSourceConnector commit record: {}", 
record.getRecordId());
+            RoutingContext routingContext = (RoutingContext) 
record.getExtensionObj("routingContext");
+            if (routingContext != null) {
+                routingContext.response()
                     .putHeader("content-type", "application/json")
                     .setStatusCode(HttpResponseStatus.OK.code())
-                    .end("{\"status\":\"success\",\"recordId\":\"" + 
record.getRecordId() + "\"}");
-            });
+                    .end(CommonResponse.success().toJsonStr());
+            } else {
+                log.error("Failed to commit the record, routingContext is 
null, recordId: {}", record.getRecordId());
+            }
         }
     }
 
@@ -185,13 +188,13 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
     @Override
     public List<ConnectRecord> poll() {
-        // record current time
         long startTime = System.currentTimeMillis();
+        long maxPollWaitTime = 5000;
         long remainingTime = maxPollWaitTime;
 
         // poll from queue
-        List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
-        for (int i = 0; i < maxBatchSize; i++) {
+        List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
+        for (int i = 0; i < batchSize; i++) {
             try {
                 Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS);
                 if (obj == null) {
@@ -206,8 +209,9 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
                 remainingTime = maxPollWaitTime > elapsedTime ? 
maxPollWaitTime - elapsedTime : 0;
             } catch (Exception e) {
                 log.error("Failed to poll from queue.", e);
-                break;
+                throw new RuntimeException(e);
             }
+
         }
         return connectRecords;
     }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
index a44ed0e90..10158f6eb 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
@@ -57,7 +57,7 @@ public class CloudEventProtocol implements Protocol {
     /**
      * Handle the protocol message for CloudEvent.
      *
-     * @param route     route
+     * @param route route
      * @param queue queue info
      */
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to