This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9bc0d26 [pulsar-client-tools] Add support for websocket
produce/consume command (#3835)
9bc0d26 is described below
commit 9bc0d268608bce7957dbc9a3207c90ccae697c39
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Mar 19 10:37:29 2019 -0700
[pulsar-client-tools] Add support for websocket produce/consume command
(#3835)
* [pulsar-client-tools] Add support for websocket produce/consume command
* remove comments
---
pulsar-client-tools/pom.xml | 5 +
.../org/apache/pulsar/client/cli/CmdConsume.java | 189 ++++++++++++++++++++-
.../org/apache/pulsar/client/cli/CmdProduce.java | 188 +++++++++++++++++++-
.../apache/pulsar/client/cli/PulsarClientTool.java | 10 +-
4 files changed, 380 insertions(+), 12 deletions(-)
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 072657b..8a3ac94 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -66,6 +66,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-websocket</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- functions related dependencies (begin) -->
<dependency>
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index f19e6a5..4f86556 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -20,17 +20,37 @@ package org.apache.pulsar.client.cli;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.HexDump;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +58,9 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
/**
* pulsar-client consume command implementation.
@@ -68,8 +91,10 @@ public class CmdConsume {
@Parameter(names = { "-r", "--rate" }, description = "Rate (in msg/sec) at
which to consume, "
+ "value 0 means to consume messages as fast as possible.")
private double consumeRate = 0;
-
- ClientBuilder clientBuilder;
+
+ private ClientBuilder clientBuilder;
+ private Authentication authentication;
+ private String serviceURL;
public CmdConsume() {
// Do nothing
@@ -79,8 +104,10 @@ public class CmdConsume {
* Set client configuration.
*
*/
- public void updateConfig(ClientBuilder clientBuilder) {
+ public void updateConfig(ClientBuilder clientBuilder, Authentication
authentication, String serviceURL) {
this.clientBuilder = clientBuilder;
+ this.authentication = authentication;
+ this.serviceURL = serviceURL;
}
/**
@@ -117,12 +144,22 @@ public class CmdConsume {
throw (new ParameterException("Number of messages should be zero
or positive."));
String topic = this.mainOptions.get(0);
+
+ if(this.serviceURL.startsWith("ws")) {
+ return consumeFromWebSocket(topic);
+ }else {
+ return consume(topic);
+ }
+ }
+
+ private int consume(String topic) {
int numMessagesConsumed = 0;
int returnCode = 0;
try {
PulsarClient client = clientBuilder.build();
- Consumer<byte[]> consumer =
client.newConsumer().topic(topic).subscriptionName(this.subscriptionName).subscriptionType(subscriptionType).subscribe();
+ Consumer<byte[]> consumer =
client.newConsumer().topic(topic).subscriptionName(this.subscriptionName)
+ .subscriptionType(subscriptionType).subscribe();
RateLimiter limiter = (this.consumeRate > 0) ?
RateLimiter.create(this.consumeRate) : null;
while (this.numMessagesToConsume == 0 || numMessagesConsumed <
this.numMessagesToConsume) {
@@ -151,5 +188,149 @@ public class CmdConsume {
}
return returnCode;
+
+ }
+
+ @SuppressWarnings("deprecation")
+ private int consumeFromWebSocket(String topic) {
+ int numMessagesConsumed = 0;
+ int returnCode = 0;
+
+ TopicName topicName = TopicName.get(topic);
+
+ String wsTopic = String.format(
+ "%s/%s/" + (StringUtils.isEmpty(topicName.getCluster()) ? "" :
topicName.getCluster() + "/")
+ + "%s/%s/%s?subscriptionType=%s",
+ topicName.getDomain(), topicName.getTenant(),
topicName.getNamespacePortion(), topicName.getLocalName(),
+ subscriptionName, subscriptionType.toString());
+
+ String consumerBaseUri = serviceURL + (serviceURL.endsWith("/") ? "" :
"/") + "ws/consumer/" + wsTopic;
+ URI consumerUri = URI.create(consumerBaseUri);
+
+ WebSocketClient produceClient = new WebSocketClient(new
SslContextFactory(true));
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ try {
+ if (authentication != null) {
+ authentication.start();
+ AuthenticationDataProvider authData =
authentication.getAuthData();
+ if (authData.hasDataForHttp()) {
+ for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
+ produceRequest.setHeader(kv.getKey(), kv.getValue());
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Authentication plugin error: " + e.getMessage());
+ return -1;
+ }
+ CompletableFuture<Void> connected = new CompletableFuture<>();
+ ConsumerSocket consumerSocket = new ConsumerSocket(connected);
+ try {
+ produceClient.start();
+ } catch (Exception e) {
+ LOG.error("Failed to start websocket-client", e);
+ return -1;
+ }
+
+ try {
+ LOG.info("Trying to create websocket session..{}",consumerUri);
+ produceClient.connect(consumerSocket, consumerUri, produceRequest);
+ connected.get();
+ } catch (Exception e) {
+ LOG.error("Failed to create web-socket session", e);
+ return -1;
+ }
+
+ try {
+ RateLimiter limiter = (this.consumeRate > 0) ?
RateLimiter.create(this.consumeRate) : null;
+ while (this.numMessagesToConsume == 0 || numMessagesConsumed <
this.numMessagesToConsume) {
+ if (limiter != null) {
+ limiter.acquire();
+ }
+ String msg = consumerSocket.receive(5, TimeUnit.SECONDS);
+ if (msg == null) {
+ LOG.debug("No message to consume after waiting for 5
seconds.");
+ } else {
+ try {
+ System.out.println(Base64.getDecoder().decode(msg));
+ }catch(Exception e) {
+ System.out.println(msg);
+ }
+ numMessagesConsumed += 1;
+ }
+ }
+ consumerSocket.awaitClose(2, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Error while consuming messages");
+ LOG.error(e.getMessage(), e);
+ returnCode = -1;
+ } finally {
+ LOG.info("{} messages successfully consumed", numMessagesConsumed);
+ }
+
+ return returnCode;
+ }
+
+ @WebSocket(maxTextMessageSize = 64 * 1024)
+ public static class ConsumerSocket {
+ private static final String X_PULSAR_MESSAGE_ID = "messageId";
+ private final CountDownLatch closeLatch;
+ private Session session;
+ private CompletableFuture<Void> connected;
+ final BlockingQueue<String> incomingMessages;
+
+ public ConsumerSocket(CompletableFuture<Void> connected) {
+ this.closeLatch = new CountDownLatch(1);
+ this.connected = connected;
+ this.incomingMessages = new GrowableArrayBlockingQueue<>();
+ }
+
+ public boolean awaitClose(int duration, TimeUnit unit) throws
InterruptedException {
+ return this.closeLatch.await(duration, unit);
+ }
+
+ @OnWebSocketClose
+ public void onClose(int statusCode, String reason) {
+ log.info("Connection closed: {} - {}", statusCode, reason);
+ this.session = null;
+ this.closeLatch.countDown();
+ }
+
+ @OnWebSocketConnect
+ public void onConnect(Session session) throws InterruptedException {
+ log.info("Got connect: {}", session);
+ this.session = session;
+ this.connected.complete(null);
+ }
+
+ @OnWebSocketMessage
+ public synchronized void onMessage(String msg) throws Exception {
+ JsonObject message = new Gson().fromJson(msg, JsonObject.class);
+ JsonObject ack = new JsonObject();
+ String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
+ ack.add("messageId", new JsonPrimitive(messageId));
+ // Acking the proxy
+ this.getRemote().sendString(ack.toString());
+ this.incomingMessages.put(msg);
+ }
+
+ public String receive(long timeout, TimeUnit unit) throws Exception {
+ return incomingMessages.poll(timeout, unit);
+ }
+
+ public RemoteEndpoint getRemote() {
+ return this.session.getRemote();
+ }
+
+ public Session getSession() {
+ return this.session;
+ }
+
+ public void close() {
+ this.session.close();
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(ConsumerSocket.class);
+
}
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index c1eba86..990ac25 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -21,18 +21,44 @@ package org.apache.pulsar.client.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +91,9 @@ public class CmdProduce {
+ "value 0 means to produce messages as fast as possible.")
private double publishRate = 0;
- ClientBuilder clientBuilder;
+ private ClientBuilder clientBuilder;
+ private Authentication authentication;
+ private String serviceURL;
public CmdProduce() {
// Do nothing
@@ -75,8 +103,10 @@ public class CmdProduce {
* Set Pulsar client configuration.
*
*/
- public void updateConfig(ClientBuilder newBuilder) {
+ public void updateConfig(ClientBuilder newBuilder, Authentication
authentication, String serviceURL) {
this.clientBuilder = newBuilder;
+ this.authentication = authentication;
+ this.serviceURL = serviceURL;
}
/*
@@ -114,12 +144,15 @@ public class CmdProduce {
* @throws Exception
*/
public int run() throws PulsarClientException {
- if (mainOptions.size() != 1)
+ if (mainOptions.size() != 1) {
throw (new ParameterException("Please provide one and only one
topic name."));
- if (this.numTimesProduce <= 0)
+ }
+ if (this.numTimesProduce <= 0) {
throw (new ParameterException("Number of times need to be positive
number."));
- if (messages.size() == 0 && messageFileNames.size() == 0)
+ }
+ if (messages.size() == 0 && messageFileNames.size() == 0) {
throw (new ParameterException("Please supply message content with
either --messages or --files"));
+ }
int totalMessages = (messages.size() + messageFileNames.size()) *
numTimesProduce;
if (totalMessages > MAX_MESSAGES) {
@@ -129,6 +162,15 @@ public class CmdProduce {
}
String topic = this.mainOptions.get(0);
+
+ if (this.serviceURL.startsWith("ws")) {
+ return publishToWebSocket(totalMessages, topic);
+ } else {
+ return publish(totalMessages, topic);
+ }
+ }
+
+ private int publish(int totalMessages, String topic) {
int numMessagesSent = 0;
int returnCode = 0;
@@ -159,4 +201,140 @@ public class CmdProduce {
return returnCode;
}
+
+ @SuppressWarnings("deprecation")
+ private int publishToWebSocket(int totalMessages, String topic) {
+ int numMessagesSent = 0;
+ int returnCode = 0;
+
+ TopicName topicName = TopicName.get(topic);
+ String wsTopic =
String.format("%s/%s/"+(StringUtils.isEmpty(topicName.getCluster()) ? "" :
topicName.getCluster()+"/")+"%s/%s",
topicName.getDomain(),topicName.getTenant(),topicName.getNamespacePortion(),topicName.getLocalName());
+ String produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ?
"" : "/") + "ws/producer/" + wsTopic;
+ URI produceUri = URI.create(produceBaseEndPoint);
+
+ WebSocketClient produceClient = new WebSocketClient(new
SslContextFactory(true));
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ try {
+ if (authentication != null) {
+ authentication.start();
+ AuthenticationDataProvider authData =
authentication.getAuthData();
+ if (authData.hasDataForHttp()) {
+ for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
+ produceRequest.setHeader(kv.getKey(), kv.getValue());
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Authentication plugin error: " + e.getMessage());
+ return -1;
+ }
+
+ CompletableFuture<Void> connected = new CompletableFuture<>();
+ ProducerSocket produceSocket = new ProducerSocket(connected);
+ try {
+ produceClient.start();
+ } catch (Exception e) {
+ LOG.error("Failed to start websocket-client", e);
+ return -1;
+ }
+
+ try {
+ LOG.info("Trying to create websocket session.. on {},{}",
produceUri, produceRequest);
+ produceClient.connect(produceSocket, produceUri, produceRequest);
+ connected.get();
+ } catch (Exception e) {
+ LOG.error("Failed to create web-socket session", e);
+ return -1;
+ }
+
+ try {
+ List<byte[]> messageBodies = generateMessageBodies(this.messages,
this.messageFileNames);
+ RateLimiter limiter = (this.publishRate > 0) ?
RateLimiter.create(this.publishRate) : null;
+ for (int i = 0; i < this.numTimesProduce; i++) {
+ int index = i * 10;
+ for (byte[] content : messageBodies) {
+ if (limiter != null) {
+ limiter.acquire();
+ }
+ produceSocket.send(index++,
content).get(30,TimeUnit.SECONDS);
+ numMessagesSent++;
+ }
+ }
+ produceSocket.close();
+ } catch (Exception e) {
+ LOG.error("Error while producing messages");
+ LOG.error(e.getMessage(), e);
+ returnCode = -1;
+ } finally {
+ LOG.info("{} messages successfully produced", numMessagesSent);
+ }
+
+ return returnCode;
+ }
+
+ @WebSocket(maxTextMessageSize = 64 * 1024)
+ public static class ProducerSocket {
+
+ private final CountDownLatch closeLatch;
+ private Session session;
+ private CompletableFuture<Void> connected;
+ private volatile CompletableFuture<Void> result;
+
+ public ProducerSocket(CompletableFuture<Void> connected) {
+ this.closeLatch = new CountDownLatch(1);
+ this.connected = connected;
+ }
+
+ public CompletableFuture<Void> send(int index, byte[] content) throws
Exception {
+ this.session.getRemote().sendString(getTestJsonPayload(index,
content));
+ this.result = new CompletableFuture<>();
+ return result;
+ }
+
+ private static String getTestJsonPayload(int index, byte[] content)
throws JsonProcessingException {
+ ProducerMessage msg = new ProducerMessage();
+ msg.payload = Base64.getEncoder().encodeToString(content);
+ msg.key = Integer.toString(index);
+ return
ObjectMapperFactory.getThreadLocal().writeValueAsString(msg);
+ }
+
+ public boolean awaitClose(int duration, TimeUnit unit) throws
InterruptedException {
+ return this.closeLatch.await(duration, unit);
+ }
+
+ @OnWebSocketClose
+ public void onClose(int statusCode, String reason) {
+ LOG.info("Connection closed: {} - {}", statusCode, reason);
+ this.session = null;
+ this.closeLatch.countDown();
+ }
+
+ @OnWebSocketConnect
+ public void onConnect(Session session) throws Exception {
+ LOG.info("Got connect: {}", session);
+ this.session = session;
+ this.connected.complete(null);
+ }
+
+ @OnWebSocketMessage
+ public synchronized void onMessage(String msg) throws
JsonParseException {
+ LOG.info("ack= {}",msg);
+ if(this.result!=null) {
+ this.result.complete(null);
+ }
+ }
+
+ public RemoteEndpoint getRemote() {
+ return this.session.getRemote();
+ }
+
+ public Session getSession() {
+ return this.session;
+ }
+
+ public void close() {
+ this.session.close();
+ }
+
+ }
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index 95f0d51..122f89d 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -27,6 +27,8 @@ import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
@@ -90,14 +92,16 @@ public class PulsarClientTool {
private void updateConfig() throws UnsupportedAuthenticationException,
MalformedURLException {
ClientBuilder clientBuilder = PulsarClient.builder();
+ Authentication authentication = null;
if (isNotBlank(this.authPluginClassName)) {
- clientBuilder.authentication(authPluginClassName, authParams);
+ authentication = AuthenticationFactory.create(authPluginClassName,
authParams);
+ clientBuilder.authentication(authentication);
}
clientBuilder.allowTlsInsecureConnection(this.tlsAllowInsecureConnection);
clientBuilder.tlsTrustCertsFilePath(this.tlsTrustCertsFilePath);
clientBuilder.serviceUrl(serviceURL);
- this.produceCommand.updateConfig(clientBuilder);
- this.consumeCommand.updateConfig(clientBuilder);
+ this.produceCommand.updateConfig(clientBuilder, authentication,
this.serviceURL);
+ this.consumeCommand.updateConfig(clientBuilder, authentication,
this.serviceURL);
}
public int run(String[] args) {