Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2433#discussion_r152900918
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java 
---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.nimbus;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.commons.io.IOUtils;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.recipes.leader.LeaderLatch;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.InputStreamWithMeta;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.daemon.nimbus.TopoCache;
    +import org.apache.storm.generated.AuthorizationException;
    +import org.apache.storm.generated.KeyNotFoundException;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.security.auth.ReqContext;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.ClientZookeeper;
    +import org.apache.zookeeper.CreateMode;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.security.auth.Subject;
    +import java.io.IOException;
    +import java.util.*;
    +
    +/**
    + * A callback function when nimbus gains leadership
    + */
    +public class LeaderListenerCallback {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(LeaderListenerCallback.class);
    +
    +    private final BlobStore blobStore;
    +    private final TopoCache tc;
    +    private final IStormClusterState clusterState;
    +
    +    private final CuratorFramework zk;
    +    private final LeaderLatch leaderLatch;
    +
    +    private final Map conf;
    +    private final List<ACL> acls;
    +
    +    private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
    +    private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
    +    private static final String STORM_CONF_SUFFIX = "-stormconf.ser";
    +
    +    public LeaderListenerCallback(Map conf, CuratorFramework zk, 
LeaderLatch leaderLatch, BlobStore blobStore, TopoCache tc, IStormClusterState 
clusterState, List<ACL> acls) {
    +        this.blobStore = blobStore;
    +        this.tc = tc;
    +        this.clusterState = clusterState;
    +        this.zk = zk;
    +        this.leaderLatch = leaderLatch;
    +        this.conf = conf;
    +        this.acls = acls;
    +    }
    +
    +    /**
    +     * Invoke when gains leadership.
    +     */
    +    public void leaderCallBack() {
    +        //in local mode, only one leader exist
    +        if (ConfigUtils.isLocalMode(conf)) {
    +            return;
    +        }
    +        //set up nimbus-info to zk
    +        setUpNimbusInfo(acls);
    +        //sync zk assignments/id-info to local
    +        LOG.info("Sync remote assignments and id-info to local");
    +        clusterState.syncRemoteAssignments(null);
    +        clusterState.syncRemoteIds(null);
    +
    +        Set<String> activeTopologyIds = new 
TreeSet<>(ClientZookeeper.getChildren(zk,
    +                conf.get(Config.STORM_ZOOKEEPER_ROOT) + 
ClusterUtils.STORMS_SUBTREE, false));
    +
    +        Set<String> activeTopologyBlobKeys = 
populateTopologyBlobKeys(activeTopologyIds);
    +        Set<String> activeTopologyCodeKeys = 
filterTopologyCodeKeys(activeTopologyBlobKeys);
    +        Set<String> allLocalBlobKeys = 
Sets.newHashSet(blobStore.listKeys());
    +        Set<String> allLocalTopologyBlobKeys = 
filterTopologyBlobKeys(allLocalBlobKeys);
    +
    +        // this finds all active topologies blob keys from all local 
topology blob keys
    +        Sets.SetView<String> diffTopology = 
Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);
    +        LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] 
diff-topology-blobs [{}]",
    +                generateJoinedString(activeTopologyIds), 
generateJoinedString(allLocalTopologyBlobKeys),
    +                generateJoinedString(diffTopology));
    +
    +        if (diffTopology.isEmpty()) {
    +            Set<String> activeTopologyDependencies = 
getTopologyDependencyKeys(activeTopologyCodeKeys);
    +
    +            // this finds all dependency blob keys from active topologies 
from all local blob keys
    +            Sets.SetView<String> diffDependencies = 
Sets.difference(activeTopologyDependencies, allLocalBlobKeys);
    +            LOG.info("active-topology-dependencies [{}] local-blobs [{}] 
diff-topology-dependencies [{}]",
    +                    generateJoinedString(activeTopologyDependencies), 
generateJoinedString(allLocalBlobKeys),
    +                    generateJoinedString(diffDependencies));
    +
    +            if (diffDependencies.isEmpty()) {
    +                LOG.info("Accepting leadership, all active topologies and 
corresponding dependencies found locally.");
    +                tc.clear();
    +            } else {
    +                LOG.info("Code for all active topologies is available 
locally, but some dependencies are not found locally, "
    +                        + "giving up leadership.");
    +                closeLatch();
    +            }
    +        } else {
    +            LOG.info("code for all active topologies not available 
locally, giving up leadership.");
    +            closeLatch();
    +        }
    +    }
    +
    +    /**
    +     * Invoke when lost leadership.
    +     */
    +    public void notLeaderCallback() {
    --- End diff --
    
    Please add `LOG.info("{} lost leadership.", hostName);` given that it helps 
on investigating the issue on Nimbus.


---

Reply via email to