Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r163574245
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -744,19 +767,111 @@ public Credentials credentials(String stormId, 
Runnable callback) {
         @Override
         public void disconnect() {
             stateStorage.unregister(stateId);
    -        if (solo)
    +        if (solo) {
                 stateStorage.close();
    +        }
    +    }
    +
    +    @Override
    +    public PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType 
type, String topologyId, long keyVersion) {
    +        String path = ClusterUtils.secretKeysPath(type, topologyId, 
keyVersion);
    +        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, 
false), PrivateWorkerKey.class);
    +    }
    +
    +    @Override
    +    public void addPrivateWorkerKey(WorkerTokenServiceType type, String 
topologyId, long keyVersion, PrivateWorkerKey key) {
    +        assert context.getDaemonType() == DaemonType.NIMBUS;
    +        List<ACL> secretAcls = context.getZkSecretAcls(type);
    +        String path = ClusterUtils.secretKeysPath(type, topologyId, 
keyVersion);
    +        LOG.debug("Storing private key for {} connecting to a {} at {} 
with ACL {}\n\n", topologyId, type, path, secretAcls);
    +        stateStorage.set_data(path, Utils.serialize(key), secretAcls);
    +    }
    +
    +    @Override
    +    public long getNextWorkerKeyVersion(WorkerTokenServiceType type, 
String topologyId) {
    +        String path = ClusterUtils.secretKeysPath(type, topologyId);
    +        try {
    +            List<String> versions = stateStorage.get_children(path, false);
    +            return 
versions.stream().mapToLong(Long::valueOf).max().orElse(0);
    +        } catch (RuntimeException e) {
    +            if 
(Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
    +                //If the node does not exist, then the version must be 0
    +                return 0;
    +            }
    +            throw e;
    +        }
         }
     
    +    @Override
    +    public void removeExpiredWorkerKeys(String topologyId) {
    +        for (WorkerTokenServiceType type : 
WorkerTokenServiceType.values()) {
    +            String basePath = ClusterUtils.secretKeysPath(type, 
topologyId);
    +            try {
    +                for (String version : stateStorage.get_children(basePath, 
false)) {
    +                    String fullPath = basePath + ClusterUtils.ZK_SEPERATOR 
+ version;
    +                    try {
    +                        PrivateWorkerKey key = 
ClusterUtils.maybeDeserialize(stateStorage.get_data(fullPath, false), 
PrivateWorkerKey.class);
    +                        if (Time.currentTimeMillis() > 
key.get_expirationTimeMillis()) {
    +                            stateStorage.delete_node(fullPath);
    +                        }
    +                    } catch (RuntimeException e) {
    +                        //This should never happen because only the 
primary nimbus is active, but just in case
    +                        // declare the race safe, even if we lose it.
    +                        if 
(!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
    +                            throw e;
    +                        }
    +                    }
    +                }
    +            } catch (RuntimeException e) {
    +                //No node for basePath is OK, noting to remove
    --- End diff --
    
    Spelling.  s/noting/nothing/


---

Reply via email to