This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 01f159646b00bdf86977bed693947ec07bfc9d78 Author: Jason918 <[email protected]> AuthorDate: Tue Jul 6 11:45:52 2021 +0800 [Issue 11067][pulsar-client] Fix bin/pulsar-client produce not supporting v2 topic name through websocket (#11069) Fixes #11067 ### Motivation Fix bin/pulsar-client produce not supporting v2 topic name through websocket. ### Modifications Add "v2" in websoket url if topicName is in v2 format. (cherry picked from commit b4780ffc85759c3b8bd924632ec0c0dc3359116e) --- .../org/apache/pulsar/client/cli/CmdProduce.java | 23 +++++++++-- .../apache/pulsar/client/cli/TestCmdProduce.java | 44 ++++++++++++++++++++++ 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java index 40b0b3d..326e221 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java @@ -24,6 +24,7 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; import com.beust.jcommander.Parameters; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import com.google.gson.JsonParseException; @@ -265,14 +266,28 @@ public class CmdProduce { } @SuppressWarnings("deprecation") + @VisibleForTesting + public String getProduceBaseEndPoint(String topic) { + TopicName topicName = TopicName.get(topic); + String produceBaseEndPoint; + if (topicName.isV2()) { + String wsTopic = String.format("%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(), + topicName.getNamespacePortion(), topicName.getLocalName()); + produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/v2/producer/" + wsTopic; + } else { + String wsTopic = String.format("%s/%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(), + topicName.getCluster(), topicName.getNamespacePortion(), topicName.getLocalName()); + produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/producer/" + wsTopic; + } + return produceBaseEndPoint; + } + + @SuppressWarnings("deprecation") private int publishToWebSocket(String topic) { int numMessagesSent = 0; int returnCode = 0; - TopicName topicName = TopicName.get(topic); - String wsTopic = String.format("%s/%s/"+(StringUtils.isEmpty(topicName.getCluster()) ? "" : topicName.getCluster()+"/")+"%s/%s", topicName.getDomain(),topicName.getTenant(),topicName.getNamespacePortion(),topicName.getLocalName()); - String produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/producer/" + wsTopic; - URI produceUri = URI.create(produceBaseEndPoint); + URI produceUri = URI.create(getProduceBaseEndPoint(topic)); WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true)); ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java new file mode 100644 index 0000000..173fcfc --- /dev/null +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.cli; + +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TestCmdProduce { + + CmdProduce cmdProduce; + + @BeforeMethod + public void setUp() { + cmdProduce = new CmdProduce(); + cmdProduce.updateConfig(null, null, "ws://localhost:8080/"); + } + + @Test + public void testGetProduceBaseEndPoint() { + String topicNameV1 = "persistent://public/cluster/default/issue-11067"; + Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV1), + "ws://localhost:8080/ws/producer/persistent/public/cluster/default/issue-11067"); + String topicNameV2 = "persistent://public/default/issue-11067"; + Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV2), + "ws://localhost:8080/ws/v2/producer/persistent/public/default/issue-11067"); + } +} \ No newline at end of file
