This is an automated email from the ASF dual-hosted git repository.
nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 93e29161b59 [feat][cli] Add read command to pulsar-client-tools
(#19298)
93e29161b59 is described below
commit 93e29161b59a37aa73e3731903666811104ecf2f
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Mon Feb 13 09:51:29 2023 +0900
[feat][cli] Add read command to pulsar-client-tools (#19298)
---
.../pulsar/client/cli/PulsarClientToolTest.java | 51 ++++
.../pulsar/client/cli/PulsarClientToolWsTest.java | 50 +++-
.../pulsar/client/cli/AbstractCmdConsume.java | 250 ++++++++++++++++
.../org/apache/pulsar/client/cli/CmdConsume.java | 222 +-------------
.../java/org/apache/pulsar/client/cli/CmdRead.java | 324 +++++++++++++++++++++
.../apache/pulsar/client/cli/PulsarClientTool.java | 6 +
.../org/apache/pulsar/client/cli/TestCmdRead.java | 74 +++++
7 files changed, 760 insertions(+), 217 deletions(-)
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 52edde856b7..8d416125fd1 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.client.cli;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
@@ -193,6 +195,55 @@ public class PulsarClientToolTest extends BrokerTestBase {
}
}
+ @Test(timeOut = 20000)
+ public void testRead() throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("useTls", "false");
+
+ final String topicName = getTopicWithRandomSuffix("reader");
+
+ int numberOfMessages = 10;
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ PulsarClientTool pulsarClientToolReader = new
PulsarClientTool(properties);
+ String[] args = {"read", "-m", "latest", "-n",
Integer.toString(numberOfMessages), "--hex", "-r", "30",
+ topicName};
+ assertEquals(pulsarClientToolReader.run(args), 0);
+ future.complete(null);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ });
+
+ // Make sure subscription has been created
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getSubscriptions(topicName).size() == 1;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 10, 500);
+
+ assertEquals(admin.topics().getSubscriptions(topicName).size(), 1);
+
assertTrue(admin.topics().getSubscriptions(topicName).get(0).startsWith("reader-"));
+ PulsarClientTool pulsarClientToolProducer = new
PulsarClientTool(properties);
+
+ String[] args = {"produce", "--messages", "Have a nice day", "-n",
Integer.toString(numberOfMessages), "-r",
+ "20", "-p", "key1=value1", "-p", "key2=value2", "-k",
"partition_key", topicName};
+ assertEquals(pulsarClientToolProducer.run(args), 0);
+ assertFalse(future.isCompletedExceptionally());
+ future.get();
+
+ Awaitility.await()
+ .ignoreExceptions()
+ .atMost(Duration.ofMillis(20000))
+ .until(()->admin.topics().getSubscriptions(topicName).size()
== 0);
+ }
+
@Test(timeOut = 20000)
public void testEncryption() throws Exception {
Properties properties = new Properties();
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
index 5a12e77f99e..77c974de80e 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
@@ -143,4 +143,52 @@ public class PulsarClientToolWsTest extends BrokerTestBase
{
Assert.assertNotNull(subscriptions);
Assert.assertEquals(subscriptions.size(), 1);
}
-}
\ No newline at end of file
+
+ @Test(timeOut = 30000)
+ public void testWebSocketReader() throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("useTls", "false");
+
+ final String topicName = "persistent://my-property/my-ns/test/topic-"
+ UUID.randomUUID();
+
+ int numberOfMessages = 10;
+ {
+ @Cleanup("shutdown")
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ PulsarClientTool pulsarClientToolReader = new
PulsarClientTool(properties);
+ String[] args = {"read", "-m", "latest", "-n",
Integer.toString(numberOfMessages), "--hex", "-r",
+ "30", topicName};
+ Assert.assertEquals(pulsarClientToolReader.run(args), 0);
+ future.complete(null);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ });
+
+ // Make sure subscription has been created
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(200))
+ .ignoreExceptions().untilAsserted(() -> {
+
Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 1);
+
Assert.assertTrue(admin.topics().getSubscriptions(topicName).get(0).startsWith("reader-"));
+ });
+
+ PulsarClientTool pulsarClientToolProducer = new
PulsarClientTool(properties);
+
+ String[] args = {"produce", "--messages", "Have a nice day", "-n",
Integer.toString(numberOfMessages), "-r",
+ "20", "-p", "key1=value1", "-p", "key2=value2", "-k",
"partition_key", topicName};
+ Assert.assertEquals(pulsarClientToolProducer.run(args), 0);
+ future.get();
+ Assert.assertFalse(future.isCompletedExceptionally());
+ }
+
+ Awaitility.await()
+ .ignoreExceptions().untilAsserted(() -> {
+
Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 0);
+ });
+ }
+}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
new file mode 100644
index 00000000000..ef0ffbc2973
--- /dev/null
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.cli;
+
+import static
org.apache.pulsar.client.internal.PulsarClientImplementationBinding.getBytes;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.HexDump;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * common part of consume command and read command of pulsar-client.
+ *
+ */
+public abstract class AbstractCmdConsume {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(PulsarClientTool.class);
+ protected static final String MESSAGE_BOUNDARY = "----- got message -----";
+
+ protected ClientBuilder clientBuilder;
+ protected Authentication authentication;
+ protected String serviceURL;
+
+ public AbstractCmdConsume() {
+ // Do nothing
+ }
+
+ /**
+ * Set client configuration.
+ *
+ */
+ public void updateConfig(ClientBuilder clientBuilder, Authentication
authentication, String serviceURL) {
+ this.clientBuilder = clientBuilder;
+ this.authentication = authentication;
+ this.serviceURL = serviceURL;
+ }
+
+ /**
+ * Interprets the message to create a string representation.
+ *
+ * @param message
+ * The message to interpret
+ * @param displayHex
+ * Whether to display BytesMessages in hexdump style, ignored
for simple text messages
+ * @return String representation of the message
+ */
+ protected String interpretMessage(Message<?> message, boolean displayHex)
throws IOException {
+ StringBuilder sb = new StringBuilder();
+
+ String properties =
Arrays.toString(message.getProperties().entrySet().toArray());
+
+ String data;
+ Object value = message.getValue();
+ if (value == null) {
+ data = "null";
+ } else if (value instanceof byte[]) {
+ byte[] msgData = (byte[]) value;
+ data = interpretByteArray(displayHex, msgData);
+ } else if (value instanceof GenericObject) {
+ Map<String, Object> asMap = genericObjectToMap((GenericObject)
value, displayHex);
+ data = asMap.toString();
+ } else if (value instanceof ByteBuffer) {
+ data = new String(getBytes((ByteBuffer) value));
+ } else {
+ data = value.toString();
+ }
+
+ String key = null;
+ if (message.hasKey()) {
+ key = message.getKey();
+ }
+
+ sb.append("key:[").append(key).append("], ");
+ if (!properties.isEmpty()) {
+ sb.append("properties:").append(properties).append(", ");
+ }
+ sb.append("content:").append(data);
+
+ return sb.toString();
+ }
+
+ protected static String interpretByteArray(boolean displayHex, byte[]
msgData) throws IOException {
+ String data;
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ if (!displayHex) {
+ return new String(msgData);
+ } else {
+ HexDump.dump(msgData, 0, out, 0);
+ return out.toString();
+ }
+ }
+
+ protected static Map<String, Object> genericObjectToMap(GenericObject
value, boolean displayHex)
+ throws IOException {
+ switch (value.getSchemaType()) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF_NATIVE:
+ return genericRecordToMap((GenericRecord) value,
displayHex);
+ case KEY_VALUE:
+ return keyValueToMap((KeyValue) value.getNativeObject(),
displayHex);
+ default:
+ return primitiveValueToMap(value.getNativeObject(),
displayHex);
+ }
+ }
+
+ protected static Map<String, Object> keyValueToMap(KeyValue value, boolean
displayHex) throws IOException {
+ if (value == null) {
+ return ImmutableMap.of("value", "NULL");
+ }
+ return ImmutableMap.of("key", primitiveValueToMap(value.getKey(),
displayHex),
+ "value", primitiveValueToMap(value.getValue(), displayHex));
+ }
+
+ protected static Map<String, Object> primitiveValueToMap(Object value,
boolean displayHex) throws IOException {
+ if (value == null) {
+ return ImmutableMap.of("value", "NULL");
+ }
+ if (value instanceof GenericObject) {
+ return genericObjectToMap((GenericObject) value, displayHex);
+ }
+ if (value instanceof byte[]) {
+ value = interpretByteArray(displayHex, (byte[]) value);
+ }
+ return ImmutableMap.of("value", value.toString(), "type",
value.getClass());
+ }
+
+ protected static Map<String, Object> genericRecordToMap(GenericRecord
value, boolean displayHex)
+ throws IOException {
+ Map<String, Object> res = new HashMap<>();
+ for (Field f : value.getFields()) {
+ Object fieldValue = value.getField(f);
+ if (fieldValue instanceof GenericRecord) {
+ fieldValue = genericRecordToMap((GenericRecord) fieldValue,
displayHex);
+ } else if (fieldValue == null) {
+ fieldValue = "NULL";
+ } else if (fieldValue instanceof byte[]) {
+ fieldValue = interpretByteArray(displayHex, (byte[])
fieldValue);
+ }
+ res.put(f.getName(), fieldValue);
+ }
+ return res;
+ }
+
+ @WebSocket(maxTextMessageSize = 64 * 1024)
+ public static class ConsumerSocket {
+ private static final String X_PULSAR_MESSAGE_ID = "messageId";
+ private final CountDownLatch closeLatch;
+ private Session session;
+ private CompletableFuture<Void> connected;
+ final BlockingQueue<String> incomingMessages;
+
+ public ConsumerSocket(CompletableFuture<Void> connected) {
+ this.closeLatch = new CountDownLatch(1);
+ this.connected = connected;
+ this.incomingMessages = new GrowableArrayBlockingQueue<>();
+ }
+
+ public boolean awaitClose(int duration, TimeUnit unit) throws
InterruptedException {
+ return this.closeLatch.await(duration, unit);
+ }
+
+ @OnWebSocketClose
+ public void onClose(int statusCode, String reason) {
+ log.info("Connection closed: {} - {}", statusCode, reason);
+ this.session = null;
+ this.closeLatch.countDown();
+ }
+
+ @OnWebSocketConnect
+ public void onConnect(Session session) throws InterruptedException {
+ log.info("Got connect: {}", session);
+ this.session = session;
+ this.connected.complete(null);
+ }
+
+ @OnWebSocketMessage
+ public synchronized void onMessage(String msg) throws Exception {
+ JsonObject message = new Gson().fromJson(msg, JsonObject.class);
+ JsonObject ack = new JsonObject();
+ String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
+ ack.add("messageId", new JsonPrimitive(messageId));
+ // Acking the proxy
+ this.getRemote().sendString(ack.toString());
+ this.incomingMessages.put(msg);
+ }
+
+ public String receive(long timeout, TimeUnit unit) throws Exception {
+ return incomingMessages.poll(timeout, unit);
+ }
+
+ public RemoteEndpoint getRemote() {
+ return this.session.getRemote();
+ }
+
+ public Session getSession() {
+ return this.session;
+ }
+
+ public void close() {
+ this.session.close();
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(ConsumerSocket.class);
+ }
+
+}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index ef3db09a883..58ab6360a17 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -19,35 +19,21 @@
package org.apache.pulsar.client.cli;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static
org.apache.pulsar.client.internal.PulsarClientImplementationBinding.getBytes;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Base64;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-import org.apache.commons.io.HexDump;
-import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
@@ -57,33 +43,17 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.api.schema.Field;
-import org.apache.pulsar.client.api.schema.GenericObject;
-import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.websocket.api.RemoteEndpoint;
-import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
-import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
-import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
-import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* pulsar-client consume command implementation.
*
*/
@Parameters(commandDescription = "Consume messages from a specified topic")
-public class CmdConsume {
-
- private static final Logger LOG =
LoggerFactory.getLogger(PulsarClientTool.class);
- private static final String MESSAGE_BOUNDARY = "----- got message -----";
+public class CmdConsume extends AbstractCmdConsume {
@Parameter(description = "TopicName", required = true)
private List<String> mainOptions = new ArrayList<String>();
@@ -134,132 +104,14 @@ public class CmdConsume {
@Parameter(names = { "-st", "--schema-type"},
description = "Set a schema type on the consumer, it can be
'bytes' or 'auto_consume'")
- private String schematype = "bytes";
+ private String schemaType = "bytes";
@Parameter(names = { "-pm", "--pool-messages" }, description = "Use the
pooled message", arity = 1)
private boolean poolMessages = true;
- private ClientBuilder clientBuilder;
- private Authentication authentication;
- private String serviceURL;
-
public CmdConsume() {
// Do nothing
- }
-
- /**
- * Set client configuration.
- *
- */
- public void updateConfig(ClientBuilder clientBuilder, Authentication
authentication, String serviceURL) {
- this.clientBuilder = clientBuilder;
- this.authentication = authentication;
- this.serviceURL = serviceURL;
- }
-
- /**
- * Interprets the message to create a string representation.
- *
- * @param message
- * The message to interpret
- * @param displayHex
- * Whether to display BytesMessages in hexdump style, ignored
for simple text messages
- * @return String representation of the message
- */
- private String interpretMessage(Message<?> message, boolean displayHex)
throws IOException {
- StringBuilder sb = new StringBuilder();
-
- String properties =
Arrays.toString(message.getProperties().entrySet().toArray());
-
- String data;
- Object value = message.getValue();
- if (value == null) {
- data = "null";
- } else if (value instanceof byte[]) {
- byte[] msgData = (byte[]) value;
- data = interpretByteArray(displayHex, msgData);
- } else if (value instanceof GenericObject) {
- Map<String, Object> asMap = genericObjectToMap((GenericObject)
value, displayHex);
- data = asMap.toString();
- } else if (value instanceof ByteBuffer) {
- data = new String(getBytes((ByteBuffer) value));
- } else {
- data = value.toString();
- }
-
- String key = null;
- if (message.hasKey()) {
- key = message.getKey();
- }
-
- sb.append("key:[").append(key).append("], ");
- if (!properties.isEmpty()) {
- sb.append("properties:").append(properties).append(", ");
- }
- sb.append("content:").append(data);
-
- return sb.toString();
- }
-
- private static String interpretByteArray(boolean displayHex, byte[]
msgData) throws IOException {
- String data;
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- if (!displayHex) {
- return new String(msgData);
- } else {
- HexDump.dump(msgData, 0, out, 0);
- return out.toString();
- }
- }
-
- private static Map<String, Object> genericObjectToMap(GenericObject value,
boolean displayHex) throws IOException {
- switch (value.getSchemaType()) {
- case AVRO:
- case JSON:
- case PROTOBUF_NATIVE:
- return genericRecordToMap((GenericRecord) value,
displayHex);
- case KEY_VALUE:
- return keyValueToMap((KeyValue) value.getNativeObject(),
displayHex);
- default:
- return primitiveValueToMap(value.getNativeObject(),
displayHex);
- }
- }
-
- private static Map<String, Object> keyValueToMap(KeyValue value, boolean
displayHex) throws IOException {
- if (value == null) {
- return ImmutableMap.of("value", "NULL");
- }
- return ImmutableMap.of("key", primitiveValueToMap(value.getKey(),
displayHex),
- "value", primitiveValueToMap(value.getValue(), displayHex));
- }
-
- private static Map<String, Object> primitiveValueToMap(Object value,
boolean displayHex) throws IOException {
- if (value == null) {
- return ImmutableMap.of("value", "NULL");
- }
- if (value instanceof GenericObject) {
- return genericObjectToMap((GenericObject) value, displayHex);
- }
- if (value instanceof byte[]) {
- value = interpretByteArray(displayHex, (byte[]) value);
- }
- return ImmutableMap.of("value", value.toString(), "type",
value.getClass());
- }
-
- private static Map<String, Object> genericRecordToMap(GenericRecord value,
boolean displayHex) throws IOException {
- Map<String, Object> res = new HashMap<>();
- for (Field f : value.getFields()) {
- Object fieldValue = value.getField(f);
- if (fieldValue instanceof GenericRecord) {
- fieldValue = genericRecordToMap((GenericRecord) fieldValue,
displayHex);
- } else if (fieldValue == null) {
- fieldValue = "NULL";
- } else if (fieldValue instanceof byte[]) {
- fieldValue = interpretByteArray(displayHex, (byte[])
fieldValue);
- }
- res.put(f.getName(), fieldValue);
- }
- return res;
+ super();
}
/**
@@ -294,10 +146,10 @@ public class CmdConsume {
try (PulsarClient client = clientBuilder.build()){
ConsumerBuilder<?> builder;
Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
- if ("auto_consume".equals(schematype)) {
+ if ("auto_consume".equals(schemaType)) {
schema = Schema.AUTO_CONSUME();
- } else if (!"bytes".equals(schematype)) {
- throw new IllegalArgumentException("schema type must be
'bytes' or 'auto_consume");
+ } else if (!"bytes".equals(schemaType)) {
+ throw new IllegalArgumentException("schema type must be
'bytes' or 'auto_consume'");
}
builder = client.newConsumer(schema)
.subscriptionName(this.subscriptionName)
@@ -458,66 +310,4 @@ public class CmdConsume {
return returnCode;
}
- @WebSocket(maxTextMessageSize = 64 * 1024)
- public static class ConsumerSocket {
- private static final String X_PULSAR_MESSAGE_ID = "messageId";
- private final CountDownLatch closeLatch;
- private Session session;
- private CompletableFuture<Void> connected;
- final BlockingQueue<String> incomingMessages;
-
- public ConsumerSocket(CompletableFuture<Void> connected) {
- this.closeLatch = new CountDownLatch(1);
- this.connected = connected;
- this.incomingMessages = new GrowableArrayBlockingQueue<>();
- }
-
- public boolean awaitClose(int duration, TimeUnit unit) throws
InterruptedException {
- return this.closeLatch.await(duration, unit);
- }
-
- @OnWebSocketClose
- public void onClose(int statusCode, String reason) {
- log.info("Connection closed: {} - {}", statusCode, reason);
- this.session = null;
- this.closeLatch.countDown();
- }
-
- @OnWebSocketConnect
- public void onConnect(Session session) throws InterruptedException {
- log.info("Got connect: {}", session);
- this.session = session;
- this.connected.complete(null);
- }
-
- @OnWebSocketMessage
- public synchronized void onMessage(String msg) throws Exception {
- JsonObject message = new Gson().fromJson(msg, JsonObject.class);
- JsonObject ack = new JsonObject();
- String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
- ack.add("messageId", new JsonPrimitive(messageId));
- // Acking the proxy
- this.getRemote().sendString(ack.toString());
- this.incomingMessages.put(msg);
- }
-
- public String receive(long timeout, TimeUnit unit) throws Exception {
- return incomingMessages.poll(timeout, unit);
- }
-
- public RemoteEndpoint getRemote() {
- return this.session.getRemote();
- }
-
- public Session getSession() {
- return this.session;
- }
-
- public void close() {
- this.session.close();
- }
-
- private static final Logger log =
LoggerFactory.getLogger(ConsumerSocket.class);
-
- }
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
new file mode 100644
index 00000000000..4ad8a5293f6
--- /dev/null
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.cli;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+
+/**
+ * pulsar-client read command implementation.
+ *
+ */
+@Parameters(commandDescription = "Read messages from a specified topic")
+public class CmdRead extends AbstractCmdConsume {
+
+ private static final Pattern MSG_ID_PATTERN =
Pattern.compile("^(-?[1-9][0-9]*|0):(-?[1-9][0-9]*|0)$");
+
+ @Parameter(description = "TopicName", required = true)
+ private List<String> mainOptions = new ArrayList<String>();
+
+ @Parameter(names = { "-m", "--start-message-id" },
+ description = "Initial reader position, it can be 'latest',
'earliest' or '<ledgerId>:<entryId>'")
+ private String startMessageId = "latest";
+
+ @Parameter(names = { "-i", "--start-message-id-inclusive" },
+ description = "Whether to include the position specified by -m
option.")
+ private boolean startMessageIdInclusive = false;
+
+ @Parameter(names = { "-n",
+ "--num-messages" }, description = "Number of messages to read, 0
means to read forever.")
+ private int numMessagesToRead = 1;
+
+ @Parameter(names = { "--hex" }, description = "Display binary messages in
hex.")
+ private boolean displayHex = false;
+
+ @Parameter(names = { "--hide-content" }, description = "Do not write the
message to console.")
+ private boolean hideContent = false;
+
+ @Parameter(names = { "-r", "--rate" }, description = "Rate (in msg/sec) at
which to read, "
+ + "value 0 means to read messages as fast as possible.")
+ private double readRate = 0;
+
+ @Parameter(names = {"-q", "--queue-size"}, description = "Reader receiver
queue size.")
+ private int receiverQueueSize = 0;
+
+ @Parameter(names = { "-mc", "--max_chunked_msg" }, description = "Max
pending chunk messages")
+ private int maxPendingChunkedMessage = 0;
+
+ @Parameter(names = { "-ac",
+ "--auto_ack_chunk_q_full" }, description = "Auto ack for oldest
message on queue is full")
+ private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+
+ @Parameter(names = { "-ekv",
+ "--encryption-key-value" }, description = "The URI of private key
to decrypt payload, for example "
+ + "file:///path/to/private.key or
data:application/x-pem-file;base64,*****")
+ private String encKeyValue;
+
+ @Parameter(names = { "-st", "--schema-type"},
+ description = "Set a schema type on the reader, it can be 'bytes'
or 'auto_consume'")
+ private String schemaType = "bytes";
+
+ @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the
pooled message", arity = 1)
+ private boolean poolMessages = true;
+
+ public CmdRead() {
+ // Do nothing
+ super();
+ }
+
+ /**
+ * Run the read command.
+ *
+ * @return 0 for success, < 0 otherwise
+ */
+ public int run() throws PulsarClientException, IOException {
+ if (mainOptions.size() != 1) {
+ throw (new ParameterException("Please provide one and only one
topic name."));
+ }
+ if (this.numMessagesToRead < 0) {
+ throw (new ParameterException("Number of messages should be zero
or positive."));
+ }
+
+ String topic = this.mainOptions.get(0);
+
+ if (this.serviceURL.startsWith("ws")) {
+ return readFromWebSocket(topic);
+ } else {
+ return read(topic);
+ }
+ }
+
+ private int read(String topic) {
+ int numMessagesRead = 0;
+ int returnCode = 0;
+
+ try (PulsarClient client = clientBuilder.build()){
+ ReaderBuilder<?> builder;
+
+ Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
+ if ("auto_consume".equals(schemaType)) {
+ schema = Schema.AUTO_CONSUME();
+ } else if (!"bytes".equals(schemaType)) {
+ throw new IllegalArgumentException("schema type must be
'bytes' or 'auto_consume'");
+ }
+ builder = client.newReader(schema)
+ .topic(topic)
+ .startMessageId(parseMessageId(startMessageId))
+ .poolMessages(poolMessages);
+
+ if (this.startMessageIdInclusive) {
+ builder.startMessageIdInclusive();
+ }
+ if (this.maxPendingChunkedMessage > 0) {
+
builder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
+ }
+ if (this.receiverQueueSize > 0) {
+ builder.receiverQueueSize(this.receiverQueueSize);
+ }
+
+
builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
+
+ if (isNotBlank(this.encKeyValue)) {
+ builder.defaultCryptoKeyReader(this.encKeyValue);
+ }
+
+ try (Reader<?> reader = builder.create()) {
+ RateLimiter limiter = (this.readRate > 0) ?
RateLimiter.create(this.readRate) : null;
+ while (this.numMessagesToRead == 0 || numMessagesRead <
this.numMessagesToRead) {
+ if (limiter != null) {
+ limiter.acquire();
+ }
+
+ Message<?> msg = reader.readNext(5, TimeUnit.SECONDS);
+ if (msg == null) {
+ LOG.debug("No message to read after waiting for 5
seconds.");
+ } else {
+ try {
+ numMessagesRead += 1;
+ if (!hideContent) {
+ System.out.println(MESSAGE_BOUNDARY);
+ String output = this.interpretMessage(msg,
displayHex);
+ System.out.println(output);
+ } else if (numMessagesRead % 1000 == 0) {
+ System.out.println("Received " +
numMessagesRead + " messages");
+ }
+ } finally {
+ msg.release();
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error while reading messages");
+ LOG.error(e.getMessage(), e);
+ returnCode = -1;
+ } finally {
+ LOG.info("{} messages successfully read", numMessagesRead);
+ }
+
+ return returnCode;
+
+ }
+
+ @SuppressWarnings("deprecation")
+ @VisibleForTesting
+ public String getWebSocketReadUri(String topic) {
+ String serviceURLWithoutTrailingSlash = serviceURL.substring(0,
+ serviceURL.endsWith("/") ? serviceURL.length() - 1 :
serviceURL.length());
+
+ TopicName topicName = TopicName.get(topic);
+ String wsTopic;
+ if (topicName.isV2()) {
+ wsTopic = String.format("%s/%s/%s/%s", topicName.getDomain(),
topicName.getTenant(),
+ topicName.getNamespacePortion(), topicName.getLocalName());
+ } else {
+ wsTopic = String.format("%s/%s/%s/%s/%s", topicName.getDomain(),
topicName.getTenant(),
+ topicName.getCluster(), topicName.getNamespacePortion(),
topicName.getLocalName());
+ }
+
+ String msgIdQueryParam;
+ if ("latest".equals(startMessageId) ||
"earliest".equals(startMessageId)) {
+ msgIdQueryParam = startMessageId;
+ } else {
+ MessageId msgId = parseMessageId(startMessageId);
+ msgIdQueryParam =
Base64.getEncoder().encodeToString(msgId.toByteArray());
+ }
+
+ String uriFormat = "%s/ws" + (topicName.isV2() ? "/v2/" : "/") +
"reader/%s?messageId=%s";
+ return String.format(uriFormat, serviceURLWithoutTrailingSlash,
wsTopic, msgIdQueryParam);
+ }
+
+ @SuppressWarnings("deprecation")
+ private int readFromWebSocket(String topic) {
+ int numMessagesRead = 0;
+ int returnCode = 0;
+
+ URI readerUri = URI.create(getWebSocketReadUri(topic));
+
+ WebSocketClient readClient = new WebSocketClient(new
SslContextFactory(true));
+ ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
+ try {
+ if (authentication != null) {
+ authentication.start();
+ AuthenticationDataProvider authData =
authentication.getAuthData();
+ if (authData.hasDataForHttp()) {
+ for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
+ readRequest.setHeader(kv.getKey(), kv.getValue());
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Authentication plugin error: " + e.getMessage());
+ return -1;
+ }
+ CompletableFuture<Void> connected = new CompletableFuture<>();
+ ConsumerSocket readerSocket = new ConsumerSocket(connected);
+ try {
+ readClient.start();
+ } catch (Exception e) {
+ LOG.error("Failed to start websocket-client", e);
+ return -1;
+ }
+
+ try {
+ LOG.info("Trying to create websocket session..{}", readerUri);
+ readClient.connect(readerSocket, readerUri, readRequest);
+ connected.get();
+ } catch (Exception e) {
+ LOG.error("Failed to create web-socket session", e);
+ return -1;
+ }
+
+ try {
+ RateLimiter limiter = (this.readRate > 0) ?
RateLimiter.create(this.readRate) : null;
+ while (this.numMessagesToRead == 0 || numMessagesRead <
this.numMessagesToRead) {
+ if (limiter != null) {
+ limiter.acquire();
+ }
+ String msg = readerSocket.receive(5, TimeUnit.SECONDS);
+ if (msg == null) {
+ LOG.debug("No message to read after waiting for 5
seconds.");
+ } else {
+ try {
+ String output = interpretByteArray(displayHex,
Base64.getDecoder().decode(msg));
+ System.out.println(output); // print decode
+ } catch (Exception e) {
+ System.out.println(msg);
+ }
+ numMessagesRead += 1;
+ }
+ }
+ readerSocket.awaitClose(2, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Error while reading messages");
+ LOG.error(e.getMessage(), e);
+ returnCode = -1;
+ } finally {
+ LOG.info("{} messages successfully read", numMessagesRead);
+ }
+
+ return returnCode;
+ }
+
+ @VisibleForTesting
+ static MessageId parseMessageId(String msgIdStr) {
+ MessageId msgId;
+ if ("latest".equals(msgIdStr)) {
+ msgId = MessageId.latest;
+ } else if ("earliest".equals(msgIdStr)) {
+ msgId = MessageId.earliest;
+ } else {
+ Matcher matcher = MSG_ID_PATTERN.matcher(msgIdStr);
+ if (matcher.find()) {
+ msgId = new MessageIdImpl(Long.parseLong(matcher.group(1)),
Long.parseLong(matcher.group(2)), -1);
+ } else {
+ throw new IllegalArgumentException("Message ID must be
'latest', 'earliest' or '<ledgerId>:<entryId>'");
+ }
+ }
+ return msgId;
+ }
+
+}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index 2c3e6935b51..c64d80f380b 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -99,6 +99,7 @@ public class PulsarClientTool {
IUsageFormatter usageFormatter;
protected CmdProduce produceCommand;
protected CmdConsume consumeCommand;
+ protected CmdRead readCommand;
CmdGenerateDocumentation generateDocumentation;
public PulsarClientTool(Properties properties) {
@@ -126,6 +127,7 @@ public class PulsarClientTool {
protected void initJCommander() {
produceCommand = new CmdProduce();
consumeCommand = new CmdConsume();
+ readCommand = new CmdRead();
generateDocumentation = new CmdGenerateDocumentation();
this.jcommander = new JCommander();
@@ -134,6 +136,7 @@ public class PulsarClientTool {
jcommander.addObject(rootParams);
jcommander.addCommand("produce", produceCommand);
jcommander.addCommand("consume", consumeCommand);
+ jcommander.addCommand("read", readCommand);
jcommander.addCommand("generate_documentation", generateDocumentation);
}
@@ -196,6 +199,7 @@ public class PulsarClientTool {
}
this.produceCommand.updateConfig(clientBuilder, authentication,
this.rootParams.serviceURL);
this.consumeCommand.updateConfig(clientBuilder, authentication,
this.rootParams.serviceURL);
+ this.readCommand.updateConfig(clientBuilder, authentication,
this.rootParams.serviceURL);
}
public int run(String[] args) {
@@ -231,6 +235,8 @@ public class PulsarClientTool {
return produceCommand.run();
} else if ("consume".equals(chosenCommand)) {
return consumeCommand.run();
+ } else if ("read".equals(chosenCommand)) {
+ return readCommand.run();
} else if ("generate_documentation".equals(chosenCommand)) {
return generateDocumentation.run();
} else {
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
new file mode 100644
index 00000000000..ac5c562b343
--- /dev/null
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.cli;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Field;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TestCmdRead {
+
+ @DataProvider(name = "startMessageIds")
+ public Object[][] startMessageIds() {
+ return new Object[][] {
+ { "latest", "latest" },
+ { "earliest", "earliest" },
+ { "10:0", "CAoQADAA" },
+ { "10:1", "CAoQATAA" },
+ };
+ }
+
+ @Test(dataProvider = "startMessageIds")
+ public void testGetWebSocketReadUri(String msgId, String msgIdQueryParam)
throws Exception {
+ CmdRead cmdRead = new CmdRead();
+ cmdRead.updateConfig(null, null, "ws://localhost:8080/");
+ Field startMessageIdField =
CmdRead.class.getDeclaredField("startMessageId");
+ startMessageIdField.setAccessible(true);
+ startMessageIdField.set(cmdRead, msgId);
+
+ String topicNameV1 = "persistent://public/cluster/default/t1";
+ assertEquals(cmdRead.getWebSocketReadUri(topicNameV1),
+
"ws://localhost:8080/ws/reader/persistent/public/cluster/default/t1?messageId="
+ msgIdQueryParam);
+
+ String topicNameV2 = "persistent://public/default/t2";
+ assertEquals(cmdRead.getWebSocketReadUri(topicNameV2),
+
"ws://localhost:8080/ws/v2/reader/persistent/public/default/t2?messageId=" +
msgIdQueryParam);
+ }
+
+ @Test
+ public void testParseMessageId() {
+ assertEquals(CmdRead.parseMessageId("latest"), MessageId.latest);
+ assertEquals(CmdRead.parseMessageId("earliest"), MessageId.earliest);
+ assertEquals(CmdRead.parseMessageId("20:-1"), new MessageIdImpl(20,
-1, -1));
+ assertEquals(CmdRead.parseMessageId("30:0"), new MessageIdImpl(30, 0,
-1));
+ try {
+ CmdRead.parseMessageId("invalid");
+ fail("Should fail to parse invalid message ID");
+ } catch (Throwable t) {
+ assertTrue(t instanceof IllegalArgumentException);
+ }
+ }
+
+}