heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1113774325
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -207,14 +243,23 @@ public synchronized void start() throws
PulsarServerException {
throw new IllegalStateException("Invalid channel state:" +
channelState.name());
}
+ boolean debug = debug();
try {
+ this.brokerRegistry = getBrokerRegistry();
+ this.brokerRegistry.addListener((broker, type) -> {
+ handleBrokerRegistrationEvent(broker, type);
+ });
Review Comment:
updated.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit,
ServiceUnitStateData data) {
}
private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData
data) {
- if (isTargetBroker(data.sourceBroker())) {
- ServiceUnitStateData next = new ServiceUnitStateData(Owned,
data.broker(), data.sourceBroker());
- // TODO: when close, pass message to clients to connect to the new
broker
- closeServiceUnit(serviceUnit)
- .thenCompose(__ -> pubAsync(serviceUnit, next))
- .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+ if (isTransferCommand(data)) {
+ if (isTargetBroker(data.sourceBroker())) {
+ ServiceUnitStateData next = new ServiceUnitStateData(Owned,
data.broker(), data.sourceBroker());
+ // TODO: when close, pass message to clients to connect to the
new broker
+ closeServiceUnit(serviceUnit)
+ .thenCompose(__ -> pubAsync(serviceUnit, next))
+ .whenComplete((__, e) -> log(e, serviceUnit, data,
next));
+ }
+ } else {
+ if (isTargetBroker(data.broker())) {
+ ServiceUnitStateData next = new ServiceUnitStateData(Free,
data.broker());
+ closeServiceUnit(serviceUnit)
Review Comment:
We updated the monitor logic(from tombstone to rollback), and now
`handleInitEvent` does not call `closeServiceUnit`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit,
ServiceUnitStateData data) {
}
private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData
data) {
- if (isTargetBroker(data.sourceBroker())) {
- ServiceUnitStateData next = new ServiceUnitStateData(Owned,
data.broker(), data.sourceBroker());
- // TODO: when close, pass message to clients to connect to the new
broker
- closeServiceUnit(serviceUnit)
- .thenCompose(__ -> pubAsync(serviceUnit, next))
- .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+ if (isTransferCommand(data)) {
+ if (isTargetBroker(data.sourceBroker())) {
Review Comment:
no. `assign` is handled by the target broker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]