This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6bd03088bd3 [fix][broker] Fix Broker migration NPE while broker tls
url not configured (#23534)
6bd03088bd3 is described below
commit 6bd03088bd366c036390c2b99b865d952866d8bf
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Nov 1 03:47:16 2024 -0700
[fix][broker] Fix Broker migration NPE while broker tls url not configured
(#23534)
---
.../org/apache/pulsar/broker/service/AbstractTopic.java | 4 ++--
.../apache/pulsar/broker/service/ClusterMigrationTest.java | 2 +-
.../main/java/org/apache/pulsar/client/impl/ClientCnx.java | 9 ++++++---
.../java/org/apache/pulsar/common/protocol/Commands.java | 14 +++++++++-----
4 files changed, 18 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 11f00fb28e3..96ea2004be8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1363,8 +1363,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
.getClusterPoliciesAsync(pulsar.getConfig().getClusterName())
.thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
((clusterData, isNamespaceMigrationEnabled) -> {
- Optional<ClusterUrl> url =
((clusterData.isPresent() && clusterData.get().isMigrated())
- || isNamespaceMigrationEnabled)
+ Optional<ClusterUrl> url =
(clusterData.isPresent() && (clusterData.get().isMigrated()
+ || isNamespaceMigrationEnabled))
?
Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty();
return url;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index e56a3495600..e6a7d049366 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -297,7 +297,7 @@ public class ClusterMigrationTest {
assertFalse(topic2.getProducers().isEmpty());
ClusterUrl migratedUrl = new
ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
- pulsar2.getBrokerServiceUrl(),
pulsar2.getBrokerServiceUrlTls());
+ pulsar2.getBrokerServiceUrl(), null);
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(),
migratedUrl);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 24163c631ff..35c41455e89 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -710,9 +710,12 @@ public class ClientCnx extends PulsarHandler {
@Override
protected void handleTopicMigrated(CommandTopicMigrated
commandTopicMigrated) {
final long resourceId = commandTopicMigrated.getResourceId();
- final String serviceUrl = commandTopicMigrated.getBrokerServiceUrl();
- final String serviceUrlTls =
commandTopicMigrated.getBrokerServiceUrlTls();
-
+ final String serviceUrl = commandTopicMigrated.hasBrokerServiceUrl()
+ ? commandTopicMigrated.getBrokerServiceUrl()
+ : null;
+ final String serviceUrlTls =
commandTopicMigrated.hasBrokerServiceUrlTls()
+ ? commandTopicMigrated.getBrokerServiceUrlTls()
+ : null;
HandlerState resource = commandTopicMigrated.getResourceType() ==
ResourceType.Producer
? producers.get(resourceId)
: consumers.get(resourceId);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 8635368f00f..19aa9907549 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -88,6 +88,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
+import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.IntRange;
@@ -768,11 +769,14 @@ public class Commands {
public static ByteBuf newTopicMigrated(ResourceType type, long resourceId,
String brokerUrl, String brokerUrlTls) {
BaseCommand cmd = localCmd(Type.TOPIC_MIGRATED);
- cmd.setTopicMigrated()
- .setResourceType(type)
- .setResourceId(resourceId)
- .setBrokerServiceUrl(brokerUrl)
- .setBrokerServiceUrlTls(brokerUrlTls);
+ CommandTopicMigrated migratedCmd = cmd.setTopicMigrated();
+ migratedCmd.setResourceType(type).setResourceId(resourceId);
+ if (StringUtils.isNotBlank(brokerUrl)) {
+ migratedCmd.setBrokerServiceUrl(brokerUrl);
+ }
+ if (StringUtils.isNotBlank(brokerUrlTls)) {
+ migratedCmd.setBrokerServiceUrlTls(brokerUrlTls);
+ }
return serializeWithSize(cmd);
}