merlimat closed pull request #2487: Ensure standalone service comes back quickly after ungraceful restarts URL: https://github.com/apache/incubator-pulsar/pull/2487
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/conf/standalone.conf b/conf/standalone.conf index 09d369c6ff..f9aad4c76f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -320,6 +320,8 @@ autoSkipNonRecoverableData=false ### --- Load balancer --- ### +loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager + # Enable load balancer loadBalancerEnabled=false diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java new file mode 100644 index 0000000000..5773c61ee1 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription; +import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; +import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; + +public class NoopLoadManager implements LoadManager { + + private String lookupServiceAddress; + private ResourceUnit localResourceUnit; + private ZooKeeper zkClient; + + LocalBrokerData localData; + + private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> ObjectMapperFactory + .getThreadLocal() + .readValue(content, LocalBrokerData.class); + + @Override + public void initialize(PulsarService pulsar) { + lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort(); + localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress), + new PulsarResourceDescription()); + zkClient = pulsar.getZkClient(); + + localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), + pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); + } + + @Override + public void start() throws PulsarServerException { + String brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress; + + try { + // When running in standalone, this error can happen when killing the "standalone" process + // ungracefully since the ZK session will not be closed and it will take some time for ZK server + // to prune the expired sessions after startup. + // Since there's a single broker instance running, it's safe, in this mode, to remove the old lock + + // Delete and recreate z-node + try { + if (zkClient.exists(brokerZnodePath, null) != null) { + zkClient.delete(brokerZnodePath, -1); + } + } catch (NoNodeException nne) { + // Ignore if z-node was just expired + } + + ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + } catch (Exception e) { + throw new PulsarServerException(e); + } + } + + @Override + public boolean isCentralized() { + return false; + } + + @Override + public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception { + return Optional.of(localResourceUnit); + } + + @Override + public LoadManagerReport generateLoadReport() throws Exception { + return null; + } + + @Override + public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() { + return loadReportDeserializer; + } + + @Override + public void setLoadReportForceUpdateFlag() { + // do nothing + } + + @Override + public void writeLoadReportOnZookeeper() throws Exception { + // do nothing + } + + @Override + public void writeResourceQuotasToZooKeeper() throws Exception { + // do nothing + } + + @Override + public List<Metrics> getLoadBalancingMetrics() { + return Collections.emptyList(); + } + + @Override + public void doLoadShedding() { + // do nothing + } + + @Override + public void doNamespaceBundleSplit() throws Exception { + // do nothing + } + + @Override + public void disableBroker() throws Exception { + // do nothing + } + + @Override + public Set<String> getAvailableBrokers() throws Exception { + return Collections.singleton(lookupServiceAddress); + } + + @Override + public void stop() throws PulsarServerException { + // do nothing + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 1c369924a3..a5d1be39da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -793,7 +793,8 @@ public void start() throws PulsarServerException { if (ownerZkSessionId != 0 && ownerZkSessionId != zkClient.getSessionId()) { log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", brokerZnodePath, ownerZkSessionId); - throw new PulsarServerException("Broker-znode owned by different zk-session " + ownerZkSessionId); + throw new PulsarServerException( + "Broker-znode owned by different zk-session " + ownerZkSessionId); } // Node may already be created by another load manager: in this case update the data. zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1); diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index 94cd32582d..78b68b07e4 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -45,7 +45,6 @@ import org.apache.bookkeeper.clients.StorageClientBuilder; import org.apache.bookkeeper.clients.admin.StorageAdminClient; import org.apache.bookkeeper.clients.config.StorageClientSettings; -import org.apache.bookkeeper.clients.exceptions.ClientException; import org.apache.bookkeeper.clients.exceptions.NamespaceExistsException; import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException; import org.apache.bookkeeper.common.concurrent.FutureUtils; @@ -62,9 +61,9 @@ import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer; import org.apache.bookkeeper.util.MathUtils; import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; @@ -222,9 +221,21 @@ private void runBookies(ServerConfiguration baseConf) throws Exception { cleanDirectory(bkDataDir); } + int bookiePort = initialPort + i; + + // Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully + String registrationZnode = String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), bookiePort); + if (zkc.exists(registrationZnode, null) != null) { + try { + zkc.delete(registrationZnode, -1); + } catch (NoNodeException nne) { + // Ignore if z-node was just expired + } + } + bsConfs[i] = new ServerConfiguration(baseConf); // override settings - bsConfs[i].setBookiePort(initialPort + i); + bsConfs[i].setBookiePort(bookiePort); bsConfs[i].setZkServers("127.0.0.1:" + ZooKeeperDefaultPort); bsConfs[i].setJournalDirName(bkDataDir.getPath()); bsConfs[i].setLedgerDirNames(new String[] { bkDataDir.getPath() }); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services