This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c227d219d3ea534ac63c6c47d30e9a6c263a3c0
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Nov 1 03:47:16 2024 -0700

    [fix][broker] Fix Broker migration NPE while broker tls url not configured 
(#23534)
    
    (cherry picked from commit 6bd03088bd366c036390c2b99b865d952866d8bf)
---
 .../org/apache/pulsar/broker/service/AbstractTopic.java    |  4 ++--
 .../apache/pulsar/broker/service/ClusterMigrationTest.java |  2 +-
 .../main/java/org/apache/pulsar/client/impl/ClientCnx.java |  9 ++++++---
 .../java/org/apache/pulsar/common/protocol/Commands.java   | 14 +++++++++-----
 4 files changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index a52d100a64a..bf692118ad1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1357,8 +1357,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
                 .getClusterPoliciesAsync(pulsar.getConfig().getClusterName())
                 .thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
                         ((clusterData, isNamespaceMigrationEnabled) -> {
-                            Optional<ClusterUrl> url = 
((clusterData.isPresent() && clusterData.get().isMigrated())
-                                    || isNamespaceMigrationEnabled)
+                            Optional<ClusterUrl> url = 
(clusterData.isPresent() && (clusterData.get().isMigrated()
+                                    || isNamespaceMigrationEnabled))
                                             ? 
Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
                                             : Optional.empty();
                             return url;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index 20e13023cac..380cb710baf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -298,7 +298,7 @@ public class ClusterMigrationTest {
         assertFalse(topic2.getProducers().isEmpty());
 
         ClusterUrl migratedUrl = new 
ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
-                pulsar2.getBrokerServiceUrl(), 
pulsar2.getBrokerServiceUrlTls());
+                pulsar2.getBrokerServiceUrl(), null);
         admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
         
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(),
 migratedUrl);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6f343a2ee58..302aba28ee8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -710,9 +710,12 @@ public class ClientCnx extends PulsarHandler {
     @Override
     protected void handleTopicMigrated(CommandTopicMigrated 
commandTopicMigrated) {
         final long resourceId = commandTopicMigrated.getResourceId();
-        final String serviceUrl = commandTopicMigrated.getBrokerServiceUrl();
-        final String serviceUrlTls = 
commandTopicMigrated.getBrokerServiceUrlTls();
-
+        final String serviceUrl = commandTopicMigrated.hasBrokerServiceUrl()
+                ? commandTopicMigrated.getBrokerServiceUrl()
+                : null;
+        final String serviceUrlTls = 
commandTopicMigrated.hasBrokerServiceUrlTls()
+                ? commandTopicMigrated.getBrokerServiceUrlTls()
+                : null;
         HandlerState resource = commandTopicMigrated.getResourceType() == 
ResourceType.Producer
                 ? producers.get(resourceId)
                 : consumers.get(resourceId);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 4822dba023d..febf07adcb4 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -88,6 +88,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
+import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
 import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
 import org.apache.pulsar.common.api.proto.FeatureFlags;
 import org.apache.pulsar.common.api.proto.IntRange;
@@ -768,11 +769,14 @@ public class Commands {
 
     public static ByteBuf newTopicMigrated(ResourceType type, long resourceId, 
String brokerUrl, String brokerUrlTls) {
         BaseCommand cmd = localCmd(Type.TOPIC_MIGRATED);
-        cmd.setTopicMigrated()
-            .setResourceType(type)
-            .setResourceId(resourceId)
-            .setBrokerServiceUrl(brokerUrl)
-            .setBrokerServiceUrlTls(brokerUrlTls);
+        CommandTopicMigrated migratedCmd = cmd.setTopicMigrated();
+        migratedCmd.setResourceType(type).setResourceId(resourceId);
+        if (StringUtils.isNotBlank(brokerUrl)) {
+            migratedCmd.setBrokerServiceUrl(brokerUrl);
+        }
+        if (StringUtils.isNotBlank(brokerUrlTls)) {
+            migratedCmd.setBrokerServiceUrlTls(brokerUrlTls);
+        }
         return serializeWithSize(cmd);
     }
 

Reply via email to