This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new b72d4f8fb [ISSUE #5144] update eventmesh-connector-http module (#5145)
b72d4f8fb is described below
commit b72d4f8fba023298be89c020469e2bdad758c3ec
Author: mike_xwm <[email protected]>
AuthorDate: Wed Dec 11 22:14:59 2024 +0800
[ISSUE #5144] update eventmesh-connector-http module (#5145)
* [ISSUE #5137] update connector runtime v2 module
* fix checkStyle error
* [ISSUE #5139] update canal connector module
* [ISSUE #5141] update eventmesh-admin-server module
* [ISSUE #5144] update eventmesh-connector-http module
---
.../config/connector/http/SinkConnectorConfig.java | 13 ++++-
.../connector/http/SourceConnectorConfig.java | 10 +++-
.../http/common/SynchronizedCircularFifoQueue.java | 1 +
.../connector/http/sink/HttpSinkConnector.java | 44 ++++++++++++++-
.../http/sink/data/HttpExportMetadata.java | 2 -
.../http/sink/handler/AbstractHttpSinkHandler.java | 64 ++++++++++++++--------
.../HttpDeliveryStrategy.java} | 35 ++----------
.../sink/handler/impl/CommonHttpSinkHandler.java | 8 ++-
.../connector/http/source/HttpSourceConnector.java | 38 +++++++------
.../source/protocol/impl/CloudEventProtocol.java | 2 +-
10 files changed, 134 insertions(+), 83 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
index ccebe5a99..65fc8fe72 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
@@ -39,8 +39,8 @@ public class SinkConnectorConfig {
// timeunit: ms, default 5000ms
private int idleTimeout = 5000;
- // maximum number of HTTP/1 connections a client will pool, default 5
- private int maxConnectionPoolSize = 5;
+ // maximum number of HTTP/1 connections a client will pool, default 50
+ private int maxConnectionPoolSize = 50;
// retry config
private HttpRetryConfig retryConfig = new HttpRetryConfig();
@@ -48,6 +48,15 @@ public class SinkConnectorConfig {
// webhook config
private HttpWebhookConfig webhookConfig = new HttpWebhookConfig();
+ private String deliveryStrategy = "ROUND_ROBIN";
+
+ private boolean skipDeliverException = false;
+
+ // managed pipelining param, default true
+ private boolean isParallelized = true;
+
+ private int parallelism = 2;
+
/**
* Fill default values if absent (When there are multiple default values
for a field)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
index 282f88333..2c091e321 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
@@ -44,12 +44,18 @@ public class SourceConnectorConfig {
*/
private int maxFormAttributeSize = 1024 * 1024;
- // protocol, default Common
+ // max size of the queue, default 1000
+ private int maxStorageSize = 1000;
+
+ // batch size, default 10
+ private int batchSize = 10;
+
+ // protocol, default CloudEvent
private String protocol = "Common";
// extra config, e.g. GitHub secret
private Map<String, String> extraConfig = new HashMap<>();
// data consistency enabled, default true
- private boolean dataConsistencyEnabled = false;
+ private boolean dataConsistencyEnabled = true;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
index 9989552d1..0564e5873 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
@@ -142,6 +142,7 @@ public class SynchronizedCircularFifoQueue<E> extends
CircularFifoQueue<E> {
count++;
}
return items;
+
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
index 3df110f2e..8e808ccc9 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.connector.http.sink;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig;
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
@@ -32,6 +33,10 @@ import
org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.SneakyThrows;
@@ -45,6 +50,12 @@ public class HttpSinkConnector implements Sink,
ConnectorCreateService<Sink> {
@Getter
private HttpSinkHandler sinkHandler;
+ private ThreadPoolExecutor executor;
+
+ private final LinkedBlockingQueue<ConnectRecord> queue = new
LinkedBlockingQueue<>(10000);
+
+ private final AtomicBoolean isStart = new AtomicBoolean(true);
+
@Override
public Class<? extends Config> configClass() {
return HttpSinkConfig.class;
@@ -90,11 +101,30 @@ public class HttpSinkConnector implements Sink,
ConnectorCreateService<Sink> {
} else {
throw new IllegalArgumentException("Max retries must be greater
than or equal to 0.");
}
+ boolean isParallelized =
this.httpSinkConfig.connectorConfig.isParallelized();
+ int parallelism = isParallelized ?
this.httpSinkConfig.connectorConfig.getParallelism() : 1;
+ executor = new ThreadPoolExecutor(parallelism, parallelism, 0L,
TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(), new
EventMeshThreadFactory("http-sink-handler"));
}
@Override
public void start() throws Exception {
this.sinkHandler.start();
+ for (int i = 0; i <
this.httpSinkConfig.connectorConfig.getParallelism(); i++) {
+ executor.execute(() -> {
+ while (isStart.get()) {
+ ConnectRecord connectRecord = null;
+ try {
+ connectRecord = queue.poll(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ if (connectRecord != null) {
+ sinkHandler.handle(connectRecord);
+ }
+ }
+ });
+ }
}
@Override
@@ -114,7 +144,18 @@ public class HttpSinkConnector implements Sink,
ConnectorCreateService<Sink> {
@Override
public void stop() throws Exception {
+ isStart.set(false);
+ while (!queue.isEmpty()) {
+ ConnectRecord record = queue.poll();
+ this.sinkHandler.handle(record);
+ }
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
this.sinkHandler.stop();
+ log.info("All tasks completed, start shut down http sink connector");
}
@Override
@@ -125,8 +166,7 @@ public class HttpSinkConnector implements Sink,
ConnectorCreateService<Sink> {
log.warn("ConnectRecord data is null, ignore.");
continue;
}
- // Handle the ConnectRecord
- this.sinkHandler.handle(sinkRecord);
+ queue.put(sinkRecord);
} catch (Exception e) {
log.error("Failed to sink message via HTTP. ", e);
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
index 41a508787..111ee6b3e 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
@@ -40,8 +40,6 @@ public class HttpExportMetadata implements Serializable {
private LocalDateTime receivedTime;
- private String httpRecordId;
-
private String recordId;
private String retriedBy;
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
index 28ba79112..9ef760617 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
@@ -30,17 +30,26 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import lombok.Getter;
+
/**
* AbstractHttpSinkHandler is an abstract class that provides a base
implementation for HttpSinkHandler.
*/
public abstract class AbstractHttpSinkHandler implements HttpSinkHandler {
+ @Getter
private final SinkConnectorConfig sinkConnectorConfig;
+ @Getter
private final List<URI> urls;
+ private final HttpDeliveryStrategy deliveryStrategy;
+
+ private int roundRobinIndex = 0;
+
protected AbstractHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig)
{
this.sinkConnectorConfig = sinkConnectorConfig;
+ this.deliveryStrategy =
HttpDeliveryStrategy.valueOf(sinkConnectorConfig.getDeliveryStrategy());
// Initialize URLs
String[] urlStrings = sinkConnectorConfig.getUrls();
this.urls = Arrays.stream(urlStrings)
@@ -48,14 +57,6 @@ public abstract class AbstractHttpSinkHandler implements
HttpSinkHandler {
.collect(Collectors.toList());
}
- public SinkConnectorConfig getSinkConnectorConfig() {
- return sinkConnectorConfig;
- }
-
- public List<URI> getUrls() {
- return urls;
- }
-
/**
* Processes a ConnectRecord by sending it over HTTP or HTTPS. This method
should be called for each ConnectRecord that needs to be processed.
*
@@ -65,23 +66,38 @@ public abstract class AbstractHttpSinkHandler implements
HttpSinkHandler {
public void handle(ConnectRecord record) {
// build attributes
Map<String, Object> attributes = new ConcurrentHashMap<>();
- attributes.put(MultiHttpRequestContext.NAME, new
MultiHttpRequestContext(urls.size()));
-
- // send the record to all URLs
- for (URI url : urls) {
- // convert ConnectRecord to HttpConnectRecord
- String type = String.format("%s.%s.%s",
- this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
- this.sinkConnectorConfig.getWebhookConfig().isActivate() ?
"webhook" : "common");
- HttpConnectRecord httpConnectRecord =
HttpConnectRecord.convertConnectRecord(record, type);
-
- // add AttemptEvent to the attributes
- HttpAttemptEvent attemptEvent = new
HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
- attributes.put(HttpAttemptEvent.PREFIX +
httpConnectRecord.getHttpRecordId(), attemptEvent);
-
- // deliver the record
- deliver(url, httpConnectRecord, attributes, record);
+
+ switch (deliveryStrategy) {
+ case ROUND_ROBIN:
+ attributes.put(MultiHttpRequestContext.NAME, new
MultiHttpRequestContext(1));
+ URI url = urls.get(roundRobinIndex);
+ roundRobinIndex = (roundRobinIndex + 1) % urls.size();
+ sendRecordToUrl(record, attributes, url);
+ break;
+ case BROADCAST:
+ for (URI broadcastUrl : urls) {
+ attributes.put(MultiHttpRequestContext.NAME, new
MultiHttpRequestContext(urls.size()));
+ sendRecordToUrl(record, attributes, broadcastUrl);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown delivery strategy:
" + deliveryStrategy);
}
}
+ private void sendRecordToUrl(ConnectRecord record, Map<String, Object>
attributes, URI url) {
+ // convert ConnectRecord to HttpConnectRecord
+ String type = String.format("%s.%s.%s",
+ this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
+ this.sinkConnectorConfig.getWebhookConfig().isActivate() ?
"webhook" : "common");
+ HttpConnectRecord httpConnectRecord =
HttpConnectRecord.convertConnectRecord(record, type);
+
+ // add AttemptEvent to the attributes
+ HttpAttemptEvent attemptEvent = new
HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
+ attributes.put(HttpAttemptEvent.PREFIX +
httpConnectRecord.getHttpRecordId(), attemptEvent);
+
+ // deliver the record
+ deliver(url, httpConnectRecord, attributes, record);
+ }
+
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpDeliveryStrategy.java
similarity index 57%
copy from
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
copy to
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpDeliveryStrategy.java
index 41a508787..2e770eb12 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpDeliveryStrategy.java
@@ -15,36 +15,9 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.data;
+package org.apache.eventmesh.connector.http.sink.handler;
-import java.io.Serializable;
-import java.time.LocalDateTime;
-
-import lombok.Builder;
-import lombok.Data;
-
-/**
- * Metadata for an HTTP export operation.
- */
-@Data
-@Builder
-public class HttpExportMetadata implements Serializable {
-
- private static final long serialVersionUID = 1121010466793041920L;
-
- private String url;
-
- private int code;
-
- private String message;
-
- private LocalDateTime receivedTime;
-
- private String httpRecordId;
-
- private String recordId;
-
- private String retriedBy;
-
- private int retryNum;
+public enum HttpDeliveryStrategy {
+ ROUND_ROBIN,
+ BROADCAST
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
index 61bdc9f31..0b57cc06e 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
@@ -93,7 +93,8 @@ public class CommonHttpSinkHandler extends
AbstractHttpSinkHandler {
.setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
.setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
- .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize());
+ .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize())
+ .setPipelining(sinkConnectorConfig.isParallelized());
this.webClient = WebClient.create(vertx, options);
}
@@ -108,7 +109,7 @@ public class CommonHttpSinkHandler extends
AbstractHttpSinkHandler {
*/
@Override
public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord
httpConnectRecord, Map<String, Object> attributes,
- ConnectRecord connectRecord) {
+ ConnectRecord connectRecord) {
// create headers
Map<String, Object> extensionMap = new HashMap<>();
Set<String> extensionKeySet =
httpConnectRecord.getExtensions().keySet();
@@ -203,6 +204,9 @@ public class CommonHttpSinkHandler extends
AbstractHttpSinkHandler {
// failure
record.getCallback().onException(buildSendExceptionContext(record,
lastFailedEvent.getLastException()));
}
+ } else {
+ log.warn("still have requests to process, size {}|attempt num {}",
+ multiHttpRequestContext.getRemainingRequests(),
attemptEvent.getAttempts());
}
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
index 6c78badaf..5aa8c63dd 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.connector.http.source;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.http.source.data.CommonResponse;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
@@ -40,6 +41,7 @@ import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.LoggerHandler;
import lombok.Getter;
@@ -52,9 +54,7 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
private BlockingQueue<Object> queue;
- private int maxBatchSize;
-
- private long maxPollWaitTime;
+ private int batchSize;
private Route route;
@@ -94,11 +94,11 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
private void doInit() {
// init queue
- this.queue = new
LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
+ int maxQueueSize =
this.sourceConfig.getConnectorConfig().getMaxStorageSize();
+ this.queue = new LinkedBlockingQueue<>(maxQueueSize);
- // init poll batch size and timeout
- this.maxBatchSize =
this.sourceConfig.getPollConfig().getMaxBatchSize();
- this.maxPollWaitTime =
this.sourceConfig.getPollConfig().getMaxWaitTime();
+ // init batch size
+ this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();
// init protocol
String protocolName =
this.sourceConfig.getConnectorConfig().getProtocol();
@@ -136,14 +136,17 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
@Override
public void commit(ConnectRecord record) {
- if (this.route != null &&
sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
- this.route.handler(ctx -> {
- // Return 200 OK
- ctx.response()
+ if (sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
+ log.debug("HttpSourceConnector commit record: {}",
record.getRecordId());
+ RoutingContext routingContext = (RoutingContext)
record.getExtensionObj("routingContext");
+ if (routingContext != null) {
+ routingContext.response()
.putHeader("content-type", "application/json")
.setStatusCode(HttpResponseStatus.OK.code())
- .end("{\"status\":\"success\",\"recordId\":\"" +
record.getRecordId() + "\"}");
- });
+ .end(CommonResponse.success().toJsonStr());
+ } else {
+ log.error("Failed to commit the record, routingContext is
null, recordId: {}", record.getRecordId());
+ }
}
}
@@ -185,13 +188,13 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
@Override
public List<ConnectRecord> poll() {
- // record current time
long startTime = System.currentTimeMillis();
+ long maxPollWaitTime = 5000;
long remainingTime = maxPollWaitTime;
// poll from queue
- List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
- for (int i = 0; i < maxBatchSize; i++) {
+ List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
+ for (int i = 0; i < batchSize; i++) {
try {
Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS);
if (obj == null) {
@@ -206,8 +209,9 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
remainingTime = maxPollWaitTime > elapsedTime ?
maxPollWaitTime - elapsedTime : 0;
} catch (Exception e) {
log.error("Failed to poll from queue.", e);
- break;
+ throw new RuntimeException(e);
}
+
}
return connectRecords;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
index a44ed0e90..10158f6eb 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
@@ -57,7 +57,7 @@ public class CloudEventProtocol implements Protocol {
/**
* Handle the protocol message for CloudEvent.
*
- * @param route route
+ * @param route route
* @param queue queue info
*/
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]