heesung-sn commented on code in PR #21408:
URL: https://github.com/apache/pulsar/pull/21408#discussion_r1382240906
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -484,14 +485,22 @@ private CompletableFuture<Void> delete(boolean
failIfHasSubscriptions, boolean c
return deleteFuture;
}
+
+ @Override
+ public CompletableFuture<Void> close(boolean
closeWithoutWaitingClientDisconnect) {
+ return close(false, closeWithoutWaitingClientDisconnect);
+ }
+
/**
* Close this topic - close all producers and subscriptions associated
with this topic.
*
+ * @param closeWithoutDisconnectingClients don't disconnect clients
Review Comment:
updated.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java:
##########
@@ -156,6 +167,10 @@ void reconnectLater(Throwable exception) {
}
public void connectionClosed(ClientCnx cnx) {
+ connectionClosed(cnx, null, Optional.empty());
+ }
+
+ public void connectionClosed(ClientCnx cnx, Long initialConnectionDelayMs,
Optional<URI> hostUrl) {
Review Comment:
I agree. I will make this `Optional<Long> initialConnectionDelayMs`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1760,6 +1762,20 @@ protected void handleSend(CommandSend send, ByteBuf
headersAndPayload) {
printSendCommandDebug(send, headersAndPayload);
}
+
+ ServiceConfiguration conf =
getBrokerService().pulsar().getConfiguration();
+ if (producer.getTopic().isFenced()
+ &&
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(conf)) {
Review Comment:
This will hit only when the topic is fenced, and loadManagerClassName is a
dynamic config, and it could change at runtime. Instead, I added a faster check
func.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -732,15 +770,20 @@ private void handleOwnEvent(String serviceUnit,
ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.complete(data.dstBroker());
}
- stateChangeListeners.notify(serviceUnit, data, null);
+ CompletableFuture<Integer> ownFuture = null;
if (isTargetBroker(data.dstBroker())) {
- log(null, serviceUnit, data, null);
pulsar.getNamespaceService()
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar,
serviceUnit));
lastOwnEventHandledAt = System.currentTimeMillis();
- } else if (data.force() && isTargetBroker(data.sourceBroker())) {
- closeServiceUnit(serviceUnit);
+ ownFuture = CompletableFuture.completedFuture(null);
+ } else if ((data.force() || isTransferCommand(data)) &&
isTargetBroker(data.sourceBroker())) {
+ ownFuture = closeServiceUnit(serviceUnit, false);
+ } else {
+ ownFuture = CompletableFuture.completedFuture(null);
}
+
+ stateChangeListeners.notifyOnCompletion(ownFuture, serviceUnit, data)
+ .whenComplete((__, e) -> log(e, serviceUnit, data, null));
Review Comment:
Good point.
I think we need to log the first and second `if` cases only.
Let me update this.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -732,15 +770,20 @@ private void handleOwnEvent(String serviceUnit,
ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.complete(data.dstBroker());
}
- stateChangeListeners.notify(serviceUnit, data, null);
+ CompletableFuture<Integer> ownFuture = null;
Review Comment:
sure. I will update this.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -538,6 +538,44 @@ public CompletableFuture<Optional<String>>
getOwnerAsync(String serviceUnit) {
}
}
+ @Override
+ public Optional<String> getAssigned(String serviceUnit) {
+ if (!validateChannelState(Started, true)) {
+ return Optional.empty();
+ }
+
+ ServiceUnitStateData data = tableview.get(serviceUnit);
+ if (data == null) {
+ return Optional.empty();
+ }
+ ServiceUnitState state = state(data);
+ switch (state) {
+ case Owned, Assigning -> {
+ return Optional.of(data.dstBroker());
+ }
+ case Releasing -> {
+ if (data.dstBroker() != null) {
+ return Optional.of(data.dstBroker());
+ }
+ return Optional.empty();
Review Comment:
sure. I will update this.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -286,6 +300,37 @@ public static void createSystemTopic(PulsarService pulsar,
String topic) throws
}
}
+ /**
+ * Gets the assigned broker for the given topic.
+ * @param pulsar PulsarService instance
+ * @param topic Topic Name
+ * @return the assigned broker's BrokerLookupData instance. Empty, if not
assigned by Extensible LoadManager.
+ */
+ public static CompletableFuture<Optional<BrokerLookupData>>
getAssignedBrokerLookupData(PulsarService pulsar,
+
String topic) {
+ if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
+ var topicName = TopicName.get(topic);
+ try {
+ return pulsar.getNamespaceService().getBundleAsync(topicName)
+ .thenCompose(bundle -> {
+ var loadManager =
ExtensibleLoadManagerImpl.get(pulsar);
+ var assigned =
loadManager.getServiceUnitStateChannel()
+ .getAssigned(bundle.toString());
+ if (assigned.isPresent()) {
+ return
loadManager.getBrokerRegistry().lookupAsync(assigned.get());
+ } else {
+ return
CompletableFuture.completedFuture(Optional.empty());
+ }
+ }
+ );
+ } catch (Throwable e) {
+ log.error("Failed to DestinationBrokerLookupData for
topic:{}", topic, e);
Review Comment:
sure. I will update this.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java:
##########
@@ -66,6 +68,10 @@ protected ConnectionHandler(HandlerState state, Backoff
backoff, Connection conn
}
protected void grabCnx() {
+ grabCnx(Optional.empty());
+ }
+
+ protected void grabCnx(Optional<URI> hostURI) {
Review Comment:
I think our current practice is that we don't do the null check for optional
args(make sure optional args are not null)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]