This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 93e29161b59 [feat][cli] Add read command to pulsar-client-tools 
(#19298)
93e29161b59 is described below

commit 93e29161b59a37aa73e3731903666811104ecf2f
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Mon Feb 13 09:51:29 2023 +0900

    [feat][cli] Add read command to pulsar-client-tools (#19298)
---
 .../pulsar/client/cli/PulsarClientToolTest.java    |  51 ++++
 .../pulsar/client/cli/PulsarClientToolWsTest.java  |  50 +++-
 .../pulsar/client/cli/AbstractCmdConsume.java      | 250 ++++++++++++++++
 .../org/apache/pulsar/client/cli/CmdConsume.java   | 222 +-------------
 .../java/org/apache/pulsar/client/cli/CmdRead.java | 324 +++++++++++++++++++++
 .../apache/pulsar/client/cli/PulsarClientTool.java |   6 +
 .../org/apache/pulsar/client/cli/TestCmdRead.java  |  74 +++++
 7 files changed, 760 insertions(+), 217 deletions(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 52edde856b7..8d416125fd1 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.client.cli;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import java.time.Duration;
 import java.util.Properties;
 import java.util.UUID;
@@ -193,6 +195,55 @@ public class PulsarClientToolTest extends BrokerTestBase {
         }
     }
 
+    @Test(timeOut = 20000)
+    public void testRead() throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("useTls", "false");
+
+        final String topicName = getTopicWithRandomSuffix("reader");
+
+        int numberOfMessages = 10;
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        executor.execute(() -> {
+            try {
+                PulsarClientTool pulsarClientToolReader = new 
PulsarClientTool(properties);
+                String[] args = {"read", "-m", "latest", "-n", 
Integer.toString(numberOfMessages), "--hex", "-r", "30",
+                        topicName};
+                assertEquals(pulsarClientToolReader.run(args), 0);
+                future.complete(null);
+            } catch (Throwable t) {
+                future.completeExceptionally(t);
+            }
+        });
+
+        // Make sure subscription has been created
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getSubscriptions(topicName).size() == 1;
+            } catch (Exception e) {
+                return false;
+            }
+        }, 10, 500);
+
+        assertEquals(admin.topics().getSubscriptions(topicName).size(), 1);
+        
assertTrue(admin.topics().getSubscriptions(topicName).get(0).startsWith("reader-"));
+        PulsarClientTool pulsarClientToolProducer = new 
PulsarClientTool(properties);
+
+        String[] args = {"produce", "--messages", "Have a nice day", "-n", 
Integer.toString(numberOfMessages), "-r",
+                "20", "-p", "key1=value1", "-p", "key2=value2", "-k", 
"partition_key", topicName};
+        assertEquals(pulsarClientToolProducer.run(args), 0);
+        assertFalse(future.isCompletedExceptionally());
+        future.get();
+
+        Awaitility.await()
+                .ignoreExceptions()
+                .atMost(Duration.ofMillis(20000))
+                .until(()->admin.topics().getSubscriptions(topicName).size() 
== 0);
+    }
+
     @Test(timeOut = 20000)
     public void testEncryption() throws Exception {
         Properties properties = new Properties();
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
index 5a12e77f99e..77c974de80e 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
@@ -143,4 +143,52 @@ public class PulsarClientToolWsTest extends BrokerTestBase 
{
         Assert.assertNotNull(subscriptions);
         Assert.assertEquals(subscriptions.size(), 1);
     }
-}
\ No newline at end of file
+
+    @Test(timeOut = 30000)
+    public void testWebSocketReader() throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("useTls", "false");
+
+        final String topicName = "persistent://my-property/my-ns/test/topic-" 
+ UUID.randomUUID();
+
+        int numberOfMessages = 10;
+        {
+            @Cleanup("shutdown")
+            ExecutorService executor = Executors.newSingleThreadExecutor();
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            executor.execute(() -> {
+                try {
+                    PulsarClientTool pulsarClientToolReader = new 
PulsarClientTool(properties);
+                    String[] args = {"read", "-m", "latest", "-n", 
Integer.toString(numberOfMessages), "--hex", "-r",
+                            "30", topicName};
+                    Assert.assertEquals(pulsarClientToolReader.run(args), 0);
+                    future.complete(null);
+                } catch (Throwable t) {
+                    future.completeExceptionally(t);
+                }
+            });
+
+            // Make sure subscription has been created
+            Awaitility.await()
+                    .pollInterval(Duration.ofMillis(200))
+                    .ignoreExceptions().untilAsserted(() -> {
+                
Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 1);
+                
Assert.assertTrue(admin.topics().getSubscriptions(topicName).get(0).startsWith("reader-"));
+            });
+
+            PulsarClientTool pulsarClientToolProducer = new 
PulsarClientTool(properties);
+
+            String[] args = {"produce", "--messages", "Have a nice day", "-n", 
Integer.toString(numberOfMessages), "-r",
+                    "20", "-p", "key1=value1", "-p", "key2=value2", "-k", 
"partition_key", topicName};
+            Assert.assertEquals(pulsarClientToolProducer.run(args), 0);
+            future.get();
+            Assert.assertFalse(future.isCompletedExceptionally());
+        }
+
+        Awaitility.await()
+                .ignoreExceptions().untilAsserted(() -> {
+            
Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 0);
+        });
+    }
+}
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
new file mode 100644
index 00000000000..ef0ffbc2973
--- /dev/null
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.cli;
+
+import static 
org.apache.pulsar.client.internal.PulsarClientImplementationBinding.getBytes;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.HexDump;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * common part of consume command and read command of pulsar-client.
+ *
+ */
+public abstract class AbstractCmdConsume {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(PulsarClientTool.class);
+    protected static final String MESSAGE_BOUNDARY = "----- got message -----";
+
+    protected ClientBuilder clientBuilder;
+    protected Authentication authentication;
+    protected String serviceURL;
+
+    public AbstractCmdConsume() {
+        // Do nothing
+    }
+
+    /**
+     * Set client configuration.
+     *
+     */
+    public void updateConfig(ClientBuilder clientBuilder, Authentication 
authentication, String serviceURL) {
+        this.clientBuilder = clientBuilder;
+        this.authentication = authentication;
+        this.serviceURL = serviceURL;
+    }
+
+    /**
+     * Interprets the message to create a string representation.
+     *
+     * @param message
+     *            The message to interpret
+     * @param displayHex
+     *            Whether to display BytesMessages in hexdump style, ignored 
for simple text messages
+     * @return String representation of the message
+     */
+    protected String interpretMessage(Message<?> message, boolean displayHex) 
throws IOException {
+        StringBuilder sb = new StringBuilder();
+
+        String properties = 
Arrays.toString(message.getProperties().entrySet().toArray());
+
+        String data;
+        Object value = message.getValue();
+        if (value == null) {
+            data = "null";
+        } else if (value instanceof byte[]) {
+            byte[] msgData = (byte[]) value;
+            data = interpretByteArray(displayHex, msgData);
+        } else if (value instanceof GenericObject) {
+            Map<String, Object> asMap = genericObjectToMap((GenericObject) 
value, displayHex);
+            data = asMap.toString();
+        } else if (value instanceof ByteBuffer) {
+            data = new String(getBytes((ByteBuffer) value));
+        } else {
+            data = value.toString();
+        }
+
+        String key = null;
+        if (message.hasKey()) {
+            key = message.getKey();
+        }
+
+        sb.append("key:[").append(key).append("], ");
+        if (!properties.isEmpty()) {
+            sb.append("properties:").append(properties).append(", ");
+        }
+        sb.append("content:").append(data);
+
+        return sb.toString();
+    }
+
+    protected static String interpretByteArray(boolean displayHex, byte[] 
msgData) throws IOException {
+        String data;
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        if (!displayHex) {
+            return new String(msgData);
+        } else {
+            HexDump.dump(msgData, 0, out, 0);
+            return out.toString();
+        }
+    }
+
+    protected static Map<String, Object> genericObjectToMap(GenericObject 
value, boolean displayHex)
+            throws IOException {
+        switch (value.getSchemaType()) {
+            case AVRO:
+            case JSON:
+            case PROTOBUF_NATIVE:
+                    return genericRecordToMap((GenericRecord) value, 
displayHex);
+            case KEY_VALUE:
+                    return keyValueToMap((KeyValue) value.getNativeObject(), 
displayHex);
+            default:
+                return primitiveValueToMap(value.getNativeObject(), 
displayHex);
+        }
+    }
+
+    protected static Map<String, Object> keyValueToMap(KeyValue value, boolean 
displayHex) throws IOException {
+        if (value == null) {
+            return ImmutableMap.of("value", "NULL");
+        }
+        return ImmutableMap.of("key", primitiveValueToMap(value.getKey(), 
displayHex),
+                "value", primitiveValueToMap(value.getValue(), displayHex));
+    }
+
+    protected static Map<String, Object> primitiveValueToMap(Object value, 
boolean displayHex) throws IOException {
+        if (value == null) {
+            return ImmutableMap.of("value", "NULL");
+        }
+        if (value instanceof GenericObject) {
+            return genericObjectToMap((GenericObject) value, displayHex);
+        }
+        if (value instanceof byte[]) {
+            value = interpretByteArray(displayHex, (byte[]) value);
+        }
+        return ImmutableMap.of("value", value.toString(), "type", 
value.getClass());
+    }
+
+    protected static Map<String, Object> genericRecordToMap(GenericRecord 
value, boolean displayHex)
+            throws IOException {
+        Map<String, Object> res = new HashMap<>();
+        for (Field f : value.getFields()) {
+            Object fieldValue = value.getField(f);
+            if (fieldValue instanceof GenericRecord) {
+                fieldValue = genericRecordToMap((GenericRecord) fieldValue, 
displayHex);
+            } else if (fieldValue == null) {
+                fieldValue =  "NULL";
+            } else if (fieldValue instanceof byte[]) {
+                fieldValue = interpretByteArray(displayHex, (byte[]) 
fieldValue);
+            }
+            res.put(f.getName(), fieldValue);
+        }
+        return res;
+    }
+
+    @WebSocket(maxTextMessageSize = 64 * 1024)
+    public static class ConsumerSocket {
+        private static final String X_PULSAR_MESSAGE_ID = "messageId";
+        private final CountDownLatch closeLatch;
+        private Session session;
+        private CompletableFuture<Void> connected;
+        final BlockingQueue<String> incomingMessages;
+
+        public ConsumerSocket(CompletableFuture<Void> connected) {
+            this.closeLatch = new CountDownLatch(1);
+            this.connected = connected;
+            this.incomingMessages = new GrowableArrayBlockingQueue<>();
+        }
+
+        public boolean awaitClose(int duration, TimeUnit unit) throws 
InterruptedException {
+            return this.closeLatch.await(duration, unit);
+        }
+
+        @OnWebSocketClose
+        public void onClose(int statusCode, String reason) {
+            log.info("Connection closed: {} - {}", statusCode, reason);
+            this.session = null;
+            this.closeLatch.countDown();
+        }
+
+        @OnWebSocketConnect
+        public void onConnect(Session session) throws InterruptedException {
+            log.info("Got connect: {}", session);
+            this.session = session;
+            this.connected.complete(null);
+        }
+
+        @OnWebSocketMessage
+        public synchronized void onMessage(String msg) throws Exception {
+            JsonObject message = new Gson().fromJson(msg, JsonObject.class);
+            JsonObject ack = new JsonObject();
+            String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
+            ack.add("messageId", new JsonPrimitive(messageId));
+            // Acking the proxy
+            this.getRemote().sendString(ack.toString());
+            this.incomingMessages.put(msg);
+        }
+
+        public String receive(long timeout, TimeUnit unit) throws Exception {
+            return incomingMessages.poll(timeout, unit);
+        }
+
+        public RemoteEndpoint getRemote() {
+            return this.session.getRemote();
+        }
+
+        public Session getSession() {
+            return this.session;
+        }
+
+        public void close() {
+            this.session.close();
+        }
+
+        private static final Logger log = 
LoggerFactory.getLogger(ConsumerSocket.class);
+    }
+
+}
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index ef3db09a883..58ab6360a17 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -19,35 +19,21 @@
 package org.apache.pulsar.client.cli;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static 
org.apache.pulsar.client.internal.PulsarClientImplementationBinding.getBytes;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.RateLimiter;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.URI;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Base64;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
-import org.apache.commons.io.HexDump;
-import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
@@ -57,33 +43,17 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.api.schema.Field;
-import org.apache.pulsar.client.api.schema.GenericObject;
-import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.websocket.api.RemoteEndpoint;
-import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
-import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
-import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
-import org.eclipse.jetty.websocket.api.annotations.WebSocket;
 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
 import org.eclipse.jetty.websocket.client.WebSocketClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * pulsar-client consume command implementation.
  *
  */
 @Parameters(commandDescription = "Consume messages from a specified topic")
-public class CmdConsume {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarClientTool.class);
-    private static final String MESSAGE_BOUNDARY = "----- got message -----";
+public class CmdConsume extends AbstractCmdConsume {
 
     @Parameter(description = "TopicName", required = true)
     private List<String> mainOptions = new ArrayList<String>();
@@ -134,132 +104,14 @@ public class CmdConsume {
 
     @Parameter(names = { "-st", "--schema-type"},
             description = "Set a schema type on the consumer, it can be 
'bytes' or 'auto_consume'")
-    private String schematype = "bytes";
+    private String schemaType = "bytes";
 
     @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the 
pooled message", arity = 1)
     private boolean poolMessages = true;
 
-    private ClientBuilder clientBuilder;
-    private Authentication authentication;
-    private String serviceURL;
-
     public CmdConsume() {
         // Do nothing
-    }
-
-    /**
-     * Set client configuration.
-     *
-     */
-    public void updateConfig(ClientBuilder clientBuilder, Authentication 
authentication, String serviceURL) {
-        this.clientBuilder = clientBuilder;
-        this.authentication = authentication;
-        this.serviceURL = serviceURL;
-    }
-
-    /**
-     * Interprets the message to create a string representation.
-     *
-     * @param message
-     *            The message to interpret
-     * @param displayHex
-     *            Whether to display BytesMessages in hexdump style, ignored 
for simple text messages
-     * @return String representation of the message
-     */
-    private String interpretMessage(Message<?> message, boolean displayHex) 
throws IOException {
-        StringBuilder sb = new StringBuilder();
-
-        String properties = 
Arrays.toString(message.getProperties().entrySet().toArray());
-
-        String data;
-        Object value = message.getValue();
-        if (value == null) {
-            data = "null";
-        } else if (value instanceof byte[]) {
-            byte[] msgData = (byte[]) value;
-            data = interpretByteArray(displayHex, msgData);
-        } else if (value instanceof GenericObject) {
-            Map<String, Object> asMap = genericObjectToMap((GenericObject) 
value, displayHex);
-            data = asMap.toString();
-        } else if (value instanceof ByteBuffer) {
-            data = new String(getBytes((ByteBuffer) value));
-        } else {
-            data = value.toString();
-        }
-
-        String key = null;
-        if (message.hasKey()) {
-            key = message.getKey();
-        }
-
-        sb.append("key:[").append(key).append("], ");
-        if (!properties.isEmpty()) {
-            sb.append("properties:").append(properties).append(", ");
-        }
-        sb.append("content:").append(data);
-
-        return sb.toString();
-    }
-
-    private static String interpretByteArray(boolean displayHex, byte[] 
msgData) throws IOException {
-        String data;
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        if (!displayHex) {
-            return new String(msgData);
-        } else {
-            HexDump.dump(msgData, 0, out, 0);
-            return out.toString();
-        }
-    }
-
-    private static Map<String, Object> genericObjectToMap(GenericObject value, 
boolean displayHex) throws IOException {
-        switch (value.getSchemaType()) {
-            case AVRO:
-            case JSON:
-            case PROTOBUF_NATIVE:
-                    return genericRecordToMap((GenericRecord) value, 
displayHex);
-            case KEY_VALUE:
-                    return keyValueToMap((KeyValue) value.getNativeObject(), 
displayHex);
-            default:
-                return primitiveValueToMap(value.getNativeObject(), 
displayHex);
-        }
-    }
-
-    private static Map<String, Object> keyValueToMap(KeyValue value, boolean 
displayHex) throws IOException {
-        if (value == null) {
-            return ImmutableMap.of("value", "NULL");
-        }
-        return ImmutableMap.of("key", primitiveValueToMap(value.getKey(), 
displayHex),
-                "value", primitiveValueToMap(value.getValue(), displayHex));
-    }
-
-    private static Map<String, Object> primitiveValueToMap(Object value, 
boolean displayHex) throws IOException {
-        if (value == null) {
-            return ImmutableMap.of("value", "NULL");
-        }
-        if (value instanceof GenericObject) {
-            return genericObjectToMap((GenericObject) value, displayHex);
-        }
-        if (value instanceof byte[]) {
-            value = interpretByteArray(displayHex, (byte[]) value);
-        }
-        return ImmutableMap.of("value", value.toString(), "type", 
value.getClass());
-    }
-
-    private static Map<String, Object> genericRecordToMap(GenericRecord value, 
boolean displayHex) throws IOException {
-        Map<String, Object> res = new HashMap<>();
-        for (Field f : value.getFields()) {
-            Object fieldValue = value.getField(f);
-            if (fieldValue instanceof GenericRecord) {
-                fieldValue = genericRecordToMap((GenericRecord) fieldValue, 
displayHex);
-            } else if (fieldValue == null) {
-                fieldValue =  "NULL";
-            } else if (fieldValue instanceof byte[]) {
-                fieldValue = interpretByteArray(displayHex, (byte[]) 
fieldValue);
-            }
-            res.put(f.getName(), fieldValue);
-        }
-        return res;
+        super();
     }
 
     /**
@@ -294,10 +146,10 @@ public class CmdConsume {
         try (PulsarClient client = clientBuilder.build()){
             ConsumerBuilder<?> builder;
             Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
-            if ("auto_consume".equals(schematype)) {
+            if ("auto_consume".equals(schemaType)) {
                 schema = Schema.AUTO_CONSUME();
-            } else if (!"bytes".equals(schematype)) {
-                throw new IllegalArgumentException("schema type must be 
'bytes' or 'auto_consume");
+            } else if (!"bytes".equals(schemaType)) {
+                throw new IllegalArgumentException("schema type must be 
'bytes' or 'auto_consume'");
             }
             builder = client.newConsumer(schema)
                     .subscriptionName(this.subscriptionName)
@@ -458,66 +310,4 @@ public class CmdConsume {
         return returnCode;
     }
 
-    @WebSocket(maxTextMessageSize = 64 * 1024)
-    public static class ConsumerSocket {
-        private static final String X_PULSAR_MESSAGE_ID = "messageId";
-        private final CountDownLatch closeLatch;
-        private Session session;
-        private CompletableFuture<Void> connected;
-        final BlockingQueue<String> incomingMessages;
-
-        public ConsumerSocket(CompletableFuture<Void> connected) {
-            this.closeLatch = new CountDownLatch(1);
-            this.connected = connected;
-            this.incomingMessages = new GrowableArrayBlockingQueue<>();
-        }
-
-        public boolean awaitClose(int duration, TimeUnit unit) throws 
InterruptedException {
-            return this.closeLatch.await(duration, unit);
-        }
-
-        @OnWebSocketClose
-        public void onClose(int statusCode, String reason) {
-            log.info("Connection closed: {} - {}", statusCode, reason);
-            this.session = null;
-            this.closeLatch.countDown();
-        }
-
-        @OnWebSocketConnect
-        public void onConnect(Session session) throws InterruptedException {
-            log.info("Got connect: {}", session);
-            this.session = session;
-            this.connected.complete(null);
-        }
-
-        @OnWebSocketMessage
-        public synchronized void onMessage(String msg) throws Exception {
-            JsonObject message = new Gson().fromJson(msg, JsonObject.class);
-            JsonObject ack = new JsonObject();
-            String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
-            ack.add("messageId", new JsonPrimitive(messageId));
-            // Acking the proxy
-            this.getRemote().sendString(ack.toString());
-            this.incomingMessages.put(msg);
-        }
-
-        public String receive(long timeout, TimeUnit unit) throws Exception {
-            return incomingMessages.poll(timeout, unit);
-        }
-
-        public RemoteEndpoint getRemote() {
-            return this.session.getRemote();
-        }
-
-        public Session getSession() {
-            return this.session;
-        }
-
-        public void close() {
-            this.session.close();
-        }
-
-        private static final Logger log = 
LoggerFactory.getLogger(ConsumerSocket.class);
-
-    }
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
new file mode 100644
index 00000000000..4ad8a5293f6
--- /dev/null
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.cli;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+
+/**
+ * pulsar-client read command implementation.
+ *
+ */
+@Parameters(commandDescription = "Read messages from a specified topic")
+public class CmdRead extends AbstractCmdConsume {
+
+    private static final Pattern MSG_ID_PATTERN = 
Pattern.compile("^(-?[1-9][0-9]*|0):(-?[1-9][0-9]*|0)$");
+
+    @Parameter(description = "TopicName", required = true)
+    private List<String> mainOptions = new ArrayList<String>();
+
+    @Parameter(names = { "-m", "--start-message-id" },
+            description = "Initial reader position, it can be 'latest', 
'earliest' or '<ledgerId>:<entryId>'")
+    private String startMessageId = "latest";
+
+    @Parameter(names = { "-i", "--start-message-id-inclusive" },
+            description = "Whether to include the position specified by -m 
option.")
+    private boolean startMessageIdInclusive = false;
+
+    @Parameter(names = { "-n",
+            "--num-messages" }, description = "Number of messages to read, 0 
means to read forever.")
+    private int numMessagesToRead = 1;
+
+    @Parameter(names = { "--hex" }, description = "Display binary messages in 
hex.")
+    private boolean displayHex = false;
+
+    @Parameter(names = { "--hide-content" }, description = "Do not write the 
message to console.")
+    private boolean hideContent = false;
+
+    @Parameter(names = { "-r", "--rate" }, description = "Rate (in msg/sec) at 
which to read, "
+            + "value 0 means to read messages as fast as possible.")
+    private double readRate = 0;
+
+    @Parameter(names = {"-q", "--queue-size"}, description = "Reader receiver 
queue size.")
+    private int receiverQueueSize = 0;
+
+    @Parameter(names = { "-mc", "--max_chunked_msg" }, description = "Max 
pending chunk messages")
+    private int maxPendingChunkedMessage = 0;
+
+    @Parameter(names = { "-ac",
+            "--auto_ack_chunk_q_full" }, description = "Auto ack for oldest 
message on queue is full")
+    private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+
+    @Parameter(names = { "-ekv",
+            "--encryption-key-value" }, description = "The URI of private key 
to decrypt payload, for example "
+                    + "file:///path/to/private.key or 
data:application/x-pem-file;base64,*****")
+    private String encKeyValue;
+
+    @Parameter(names = { "-st", "--schema-type"},
+            description = "Set a schema type on the reader, it can be 'bytes' 
or 'auto_consume'")
+    private String schemaType = "bytes";
+
+    @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the 
pooled message", arity = 1)
+    private boolean poolMessages = true;
+
+    public CmdRead() {
+        // Do nothing
+        super();
+    }
+
+    /**
+     * Run the read command.
+     *
+     * @return 0 for success, < 0 otherwise
+     */
+    public int run() throws PulsarClientException, IOException {
+        if (mainOptions.size() != 1) {
+            throw (new ParameterException("Please provide one and only one 
topic name."));
+        }
+        if (this.numMessagesToRead < 0) {
+            throw (new ParameterException("Number of messages should be zero 
or positive."));
+        }
+
+        String topic = this.mainOptions.get(0);
+
+        if (this.serviceURL.startsWith("ws")) {
+            return readFromWebSocket(topic);
+        } else {
+            return read(topic);
+        }
+    }
+
+    private int read(String topic) {
+        int numMessagesRead = 0;
+        int returnCode = 0;
+
+        try (PulsarClient client = clientBuilder.build()){
+            ReaderBuilder<?> builder;
+
+            Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
+            if ("auto_consume".equals(schemaType)) {
+                schema = Schema.AUTO_CONSUME();
+            } else if (!"bytes".equals(schemaType)) {
+                throw new IllegalArgumentException("schema type must be 
'bytes' or 'auto_consume'");
+            }
+            builder = client.newReader(schema)
+                    .topic(topic)
+                    .startMessageId(parseMessageId(startMessageId))
+                    .poolMessages(poolMessages);
+
+            if (this.startMessageIdInclusive) {
+                builder.startMessageIdInclusive();
+            }
+            if (this.maxPendingChunkedMessage > 0) {
+                
builder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
+            }
+            if (this.receiverQueueSize > 0) {
+                builder.receiverQueueSize(this.receiverQueueSize);
+            }
+
+            
builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
+
+            if (isNotBlank(this.encKeyValue)) {
+                builder.defaultCryptoKeyReader(this.encKeyValue);
+            }
+
+            try (Reader<?> reader = builder.create()) {
+                RateLimiter limiter = (this.readRate > 0) ? 
RateLimiter.create(this.readRate) : null;
+                while (this.numMessagesToRead == 0 || numMessagesRead < 
this.numMessagesToRead) {
+                    if (limiter != null) {
+                        limiter.acquire();
+                    }
+
+                    Message<?> msg = reader.readNext(5, TimeUnit.SECONDS);
+                    if (msg == null) {
+                        LOG.debug("No message to read after waiting for 5 
seconds.");
+                    } else {
+                        try {
+                            numMessagesRead += 1;
+                            if (!hideContent) {
+                                System.out.println(MESSAGE_BOUNDARY);
+                                String output = this.interpretMessage(msg, 
displayHex);
+                                System.out.println(output);
+                            } else if (numMessagesRead % 1000 == 0) {
+                                System.out.println("Received " + 
numMessagesRead + " messages");
+                            }
+                        } finally {
+                            msg.release();
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Error while reading messages");
+            LOG.error(e.getMessage(), e);
+            returnCode = -1;
+        } finally {
+            LOG.info("{} messages successfully read", numMessagesRead);
+        }
+
+        return returnCode;
+
+    }
+
+    @SuppressWarnings("deprecation")
+    @VisibleForTesting
+    public String getWebSocketReadUri(String topic) {
+        String serviceURLWithoutTrailingSlash = serviceURL.substring(0,
+                serviceURL.endsWith("/") ? serviceURL.length() - 1 : 
serviceURL.length());
+
+        TopicName topicName = TopicName.get(topic);
+        String wsTopic;
+        if (topicName.isV2()) {
+            wsTopic = String.format("%s/%s/%s/%s", topicName.getDomain(), 
topicName.getTenant(),
+                    topicName.getNamespacePortion(), topicName.getLocalName());
+        } else {
+            wsTopic = String.format("%s/%s/%s/%s/%s", topicName.getDomain(), 
topicName.getTenant(),
+                    topicName.getCluster(), topicName.getNamespacePortion(), 
topicName.getLocalName());
+        }
+
+        String msgIdQueryParam;
+        if ("latest".equals(startMessageId) || 
"earliest".equals(startMessageId)) {
+            msgIdQueryParam = startMessageId;
+        } else {
+            MessageId msgId = parseMessageId(startMessageId);
+            msgIdQueryParam = 
Base64.getEncoder().encodeToString(msgId.toByteArray());
+        }
+
+        String uriFormat = "%s/ws" + (topicName.isV2() ? "/v2/" : "/") + 
"reader/%s?messageId=%s";
+        return String.format(uriFormat, serviceURLWithoutTrailingSlash, 
wsTopic, msgIdQueryParam);
+    }
+
+    @SuppressWarnings("deprecation")
+    private int readFromWebSocket(String topic) {
+        int numMessagesRead = 0;
+        int returnCode = 0;
+
+        URI readerUri = URI.create(getWebSocketReadUri(topic));
+
+        WebSocketClient readClient = new WebSocketClient(new 
SslContextFactory(true));
+        ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
+        try {
+            if (authentication != null) {
+                authentication.start();
+                AuthenticationDataProvider authData = 
authentication.getAuthData();
+                if (authData.hasDataForHttp()) {
+                    for (Map.Entry<String, String> kv : 
authData.getHttpHeaders()) {
+                        readRequest.setHeader(kv.getKey(), kv.getValue());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Authentication plugin error: " + e.getMessage());
+            return -1;
+        }
+        CompletableFuture<Void> connected = new CompletableFuture<>();
+        ConsumerSocket readerSocket = new ConsumerSocket(connected);
+        try {
+            readClient.start();
+        } catch (Exception e) {
+            LOG.error("Failed to start websocket-client", e);
+            return -1;
+        }
+
+        try {
+            LOG.info("Trying to create websocket session..{}", readerUri);
+            readClient.connect(readerSocket, readerUri, readRequest);
+            connected.get();
+        } catch (Exception e) {
+            LOG.error("Failed to create web-socket session", e);
+            return -1;
+        }
+
+        try {
+            RateLimiter limiter = (this.readRate > 0) ? 
RateLimiter.create(this.readRate) : null;
+            while (this.numMessagesToRead == 0 || numMessagesRead < 
this.numMessagesToRead) {
+                if (limiter != null) {
+                    limiter.acquire();
+                }
+                String msg = readerSocket.receive(5, TimeUnit.SECONDS);
+                if (msg == null) {
+                    LOG.debug("No message to read after waiting for 5 
seconds.");
+                } else {
+                    try {
+                        String output = interpretByteArray(displayHex, 
Base64.getDecoder().decode(msg));
+                        System.out.println(output); // print decode
+                    } catch (Exception e) {
+                        System.out.println(msg);
+                    }
+                    numMessagesRead += 1;
+                }
+            }
+            readerSocket.awaitClose(2, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error("Error while reading messages");
+            LOG.error(e.getMessage(), e);
+            returnCode = -1;
+        } finally {
+            LOG.info("{} messages successfully read", numMessagesRead);
+        }
+
+        return returnCode;
+    }
+
+    @VisibleForTesting
+    static MessageId parseMessageId(String msgIdStr) {
+        MessageId msgId;
+        if ("latest".equals(msgIdStr)) {
+            msgId = MessageId.latest;
+        } else if ("earliest".equals(msgIdStr)) {
+            msgId = MessageId.earliest;
+        } else {
+            Matcher matcher = MSG_ID_PATTERN.matcher(msgIdStr);
+            if (matcher.find()) {
+                msgId = new MessageIdImpl(Long.parseLong(matcher.group(1)), 
Long.parseLong(matcher.group(2)), -1);
+            } else {
+                throw new IllegalArgumentException("Message ID must be 
'latest', 'earliest' or '<ledgerId>:<entryId>'");
+            }
+        }
+        return msgId;
+    }
+
+}
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index 2c3e6935b51..c64d80f380b 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -99,6 +99,7 @@ public class PulsarClientTool {
     IUsageFormatter usageFormatter;
     protected CmdProduce produceCommand;
     protected CmdConsume consumeCommand;
+    protected CmdRead readCommand;
     CmdGenerateDocumentation generateDocumentation;
 
     public PulsarClientTool(Properties properties) {
@@ -126,6 +127,7 @@ public class PulsarClientTool {
     protected void initJCommander() {
         produceCommand = new CmdProduce();
         consumeCommand = new CmdConsume();
+        readCommand = new CmdRead();
         generateDocumentation = new CmdGenerateDocumentation();
 
         this.jcommander = new JCommander();
@@ -134,6 +136,7 @@ public class PulsarClientTool {
         jcommander.addObject(rootParams);
         jcommander.addCommand("produce", produceCommand);
         jcommander.addCommand("consume", consumeCommand);
+        jcommander.addCommand("read", readCommand);
         jcommander.addCommand("generate_documentation", generateDocumentation);
     }
 
@@ -196,6 +199,7 @@ public class PulsarClientTool {
         }
         this.produceCommand.updateConfig(clientBuilder, authentication, 
this.rootParams.serviceURL);
         this.consumeCommand.updateConfig(clientBuilder, authentication, 
this.rootParams.serviceURL);
+        this.readCommand.updateConfig(clientBuilder, authentication, 
this.rootParams.serviceURL);
     }
 
     public int run(String[] args) {
@@ -231,6 +235,8 @@ public class PulsarClientTool {
                 return produceCommand.run();
             } else if ("consume".equals(chosenCommand)) {
                 return consumeCommand.run();
+            } else if ("read".equals(chosenCommand)) {
+                return readCommand.run();
             } else if ("generate_documentation".equals(chosenCommand)) {
                 return generateDocumentation.run();
             } else {
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
new file mode 100644
index 00000000000..ac5c562b343
--- /dev/null
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.cli;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Field;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TestCmdRead {
+
+    @DataProvider(name = "startMessageIds")
+    public Object[][] startMessageIds() {
+        return new Object[][] {
+            { "latest", "latest" },
+            { "earliest", "earliest" },
+            { "10:0", "CAoQADAA" },
+            { "10:1", "CAoQATAA" },
+        };
+    }
+
+    @Test(dataProvider = "startMessageIds")
+    public void testGetWebSocketReadUri(String msgId, String msgIdQueryParam) 
throws Exception {
+        CmdRead cmdRead = new CmdRead();
+        cmdRead.updateConfig(null, null, "ws://localhost:8080/");
+        Field startMessageIdField = 
CmdRead.class.getDeclaredField("startMessageId");
+        startMessageIdField.setAccessible(true);
+        startMessageIdField.set(cmdRead, msgId);
+
+        String topicNameV1 = "persistent://public/cluster/default/t1";
+        assertEquals(cmdRead.getWebSocketReadUri(topicNameV1),
+                
"ws://localhost:8080/ws/reader/persistent/public/cluster/default/t1?messageId=" 
+ msgIdQueryParam);
+
+        String topicNameV2 = "persistent://public/default/t2";
+        assertEquals(cmdRead.getWebSocketReadUri(topicNameV2),
+                
"ws://localhost:8080/ws/v2/reader/persistent/public/default/t2?messageId=" + 
msgIdQueryParam);
+    }
+
+    @Test
+    public void testParseMessageId() {
+        assertEquals(CmdRead.parseMessageId("latest"), MessageId.latest);
+        assertEquals(CmdRead.parseMessageId("earliest"), MessageId.earliest);
+        assertEquals(CmdRead.parseMessageId("20:-1"), new MessageIdImpl(20, 
-1, -1));
+        assertEquals(CmdRead.parseMessageId("30:0"), new MessageIdImpl(30, 0, 
-1));
+        try {
+            CmdRead.parseMessageId("invalid");
+            fail("Should fail to parse invalid message ID");
+        } catch (Throwable t) {
+            assertTrue(t instanceof IllegalArgumentException);
+        }
+    }
+
+}


Reply via email to