This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 01f159646b00bdf86977bed693947ec07bfc9d78
Author: Jason918 <[email protected]>
AuthorDate: Tue Jul 6 11:45:52 2021 +0800

     [Issue 11067][pulsar-client] Fix bin/pulsar-client produce not supporting 
v2 topic name through websocket (#11069)
    
    Fixes #11067
    
    
    ### Motivation
    
    Fix bin/pulsar-client produce not supporting v2 topic name through 
websocket.
    
    ### Modifications
    
    Add "v2" in websoket url if topicName is in v2 format.
    
    (cherry picked from commit b4780ffc85759c3b8bd924632ec0c0dc3359116e)
---
 .../org/apache/pulsar/client/cli/CmdProduce.java   | 23 +++++++++--
 .../apache/pulsar/client/cli/TestCmdProduce.java   | 44 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 40b0b3d..326e221 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -24,6 +24,7 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.gson.JsonParseException;
@@ -265,14 +266,28 @@ public class CmdProduce {
     }
 
     @SuppressWarnings("deprecation")
+    @VisibleForTesting
+    public String getProduceBaseEndPoint(String topic) {
+        TopicName topicName = TopicName.get(topic);
+        String produceBaseEndPoint;
+        if (topicName.isV2()) {
+            String wsTopic = String.format("%s/%s/%s/%s", 
topicName.getDomain(), topicName.getTenant(),
+                    topicName.getNamespacePortion(), topicName.getLocalName());
+            produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ? "" 
: "/") + "ws/v2/producer/" + wsTopic;
+        } else {
+            String wsTopic = String.format("%s/%s/%s/%s/%s", 
topicName.getDomain(), topicName.getTenant(),
+                    topicName.getCluster(), topicName.getNamespacePortion(), 
topicName.getLocalName());
+            produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ? "" 
: "/") + "ws/producer/" + wsTopic;
+        }
+        return produceBaseEndPoint;
+    }
+
+    @SuppressWarnings("deprecation")
     private int publishToWebSocket(String topic) {
         int numMessagesSent = 0;
         int returnCode = 0;
 
-        TopicName topicName = TopicName.get(topic);
-        String wsTopic = 
String.format("%s/%s/"+(StringUtils.isEmpty(topicName.getCluster()) ? "" : 
topicName.getCluster()+"/")+"%s/%s", 
topicName.getDomain(),topicName.getTenant(),topicName.getNamespacePortion(),topicName.getLocalName());
 
-        String produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ? 
"" : "/") + "ws/producer/" + wsTopic;
-        URI produceUri = URI.create(produceBaseEndPoint);
+        URI produceUri = URI.create(getProduceBaseEndPoint(topic));
 
         WebSocketClient produceClient = new WebSocketClient(new 
SslContextFactory(true));
         ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
new file mode 100644
index 0000000..173fcfc
--- /dev/null
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.cli;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestCmdProduce {
+
+    CmdProduce cmdProduce;
+
+    @BeforeMethod
+    public void setUp() {
+        cmdProduce = new CmdProduce();
+        cmdProduce.updateConfig(null, null, "ws://localhost:8080/");
+    }
+
+    @Test
+    public void testGetProduceBaseEndPoint() {
+        String topicNameV1 = "persistent://public/cluster/default/issue-11067";
+        Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV1),
+                
"ws://localhost:8080/ws/producer/persistent/public/cluster/default/issue-11067");
+        String topicNameV2 = "persistent://public/default/issue-11067";
+        Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV2),
+                
"ws://localhost:8080/ws/v2/producer/persistent/public/default/issue-11067");
+    }
+}
\ No newline at end of file

Reply via email to