This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9c227d219d3ea534ac63c6c47d30e9a6c263a3c0 Author: Rajan Dhabalia <[email protected]> AuthorDate: Fri Nov 1 03:47:16 2024 -0700 [fix][broker] Fix Broker migration NPE while broker tls url not configured (#23534) (cherry picked from commit 6bd03088bd366c036390c2b99b865d952866d8bf) --- .../org/apache/pulsar/broker/service/AbstractTopic.java | 4 ++-- .../apache/pulsar/broker/service/ClusterMigrationTest.java | 2 +- .../main/java/org/apache/pulsar/client/impl/ClientCnx.java | 9 ++++++--- .../java/org/apache/pulsar/common/protocol/Commands.java | 14 +++++++++----- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index a52d100a64a..bf692118ad1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1357,8 +1357,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP .getClusterPoliciesAsync(pulsar.getConfig().getClusterName()) .thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic), ((clusterData, isNamespaceMigrationEnabled) -> { - Optional<ClusterUrl> url = ((clusterData.isPresent() && clusterData.get().isMigrated()) - || isNamespaceMigrationEnabled) + Optional<ClusterUrl> url = (clusterData.isPresent() && (clusterData.get().isMigrated() + || isNamespaceMigrationEnabled)) ? Optional.ofNullable(clusterData.get().getMigratedClusterUrl()) : Optional.empty(); return url; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index 20e13023cac..380cb710baf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -298,7 +298,7 @@ public class ClusterMigrationTest { assertFalse(topic2.getProducers().isEmpty()); ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), - pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + pulsar2.getBrokerServiceUrl(), null); admin1.clusters().updateClusterMigration("r1", true, migratedUrl); assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 6f343a2ee58..302aba28ee8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -710,9 +710,12 @@ public class ClientCnx extends PulsarHandler { @Override protected void handleTopicMigrated(CommandTopicMigrated commandTopicMigrated) { final long resourceId = commandTopicMigrated.getResourceId(); - final String serviceUrl = commandTopicMigrated.getBrokerServiceUrl(); - final String serviceUrlTls = commandTopicMigrated.getBrokerServiceUrlTls(); - + final String serviceUrl = commandTopicMigrated.hasBrokerServiceUrl() + ? commandTopicMigrated.getBrokerServiceUrl() + : null; + final String serviceUrlTls = commandTopicMigrated.hasBrokerServiceUrlTls() + ? commandTopicMigrated.getBrokerServiceUrlTls() + : null; HandlerState resource = commandTopicMigrated.getResourceType() == ResourceType.Producer ? producers.get(resourceId) : consumers.get(resourceId); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 4822dba023d..febf07adcb4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -88,6 +88,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.FeatureFlags; import org.apache.pulsar.common.api.proto.IntRange; @@ -768,11 +769,14 @@ public class Commands { public static ByteBuf newTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls) { BaseCommand cmd = localCmd(Type.TOPIC_MIGRATED); - cmd.setTopicMigrated() - .setResourceType(type) - .setResourceId(resourceId) - .setBrokerServiceUrl(brokerUrl) - .setBrokerServiceUrlTls(brokerUrlTls); + CommandTopicMigrated migratedCmd = cmd.setTopicMigrated(); + migratedCmd.setResourceType(type).setResourceId(resourceId); + if (StringUtils.isNotBlank(brokerUrl)) { + migratedCmd.setBrokerServiceUrl(brokerUrl); + } + if (StringUtils.isNotBlank(brokerUrlTls)) { + migratedCmd.setBrokerServiceUrlTls(brokerUrlTls); + } return serializeWithSize(cmd); }
