Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2531#discussion_r163574245
--- Diff:
storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
@@ -744,19 +767,111 @@ public Credentials credentials(String stormId,
Runnable callback) {
@Override
public void disconnect() {
stateStorage.unregister(stateId);
- if (solo)
+ if (solo) {
stateStorage.close();
+ }
+ }
+
+ @Override
+ public PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType
type, String topologyId, long keyVersion) {
+ String path = ClusterUtils.secretKeysPath(type, topologyId,
keyVersion);
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(path,
false), PrivateWorkerKey.class);
+ }
+
+ @Override
+ public void addPrivateWorkerKey(WorkerTokenServiceType type, String
topologyId, long keyVersion, PrivateWorkerKey key) {
+ assert context.getDaemonType() == DaemonType.NIMBUS;
+ List<ACL> secretAcls = context.getZkSecretAcls(type);
+ String path = ClusterUtils.secretKeysPath(type, topologyId,
keyVersion);
+ LOG.debug("Storing private key for {} connecting to a {} at {}
with ACL {}\n\n", topologyId, type, path, secretAcls);
+ stateStorage.set_data(path, Utils.serialize(key), secretAcls);
+ }
+
+ @Override
+ public long getNextWorkerKeyVersion(WorkerTokenServiceType type,
String topologyId) {
+ String path = ClusterUtils.secretKeysPath(type, topologyId);
+ try {
+ List<String> versions = stateStorage.get_children(path, false);
+ return
versions.stream().mapToLong(Long::valueOf).max().orElse(0);
+ } catch (RuntimeException e) {
+ if
(Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
+ //If the node does not exist, then the version must be 0
+ return 0;
+ }
+ throw e;
+ }
}
+ @Override
+ public void removeExpiredWorkerKeys(String topologyId) {
+ for (WorkerTokenServiceType type :
WorkerTokenServiceType.values()) {
+ String basePath = ClusterUtils.secretKeysPath(type,
topologyId);
+ try {
+ for (String version : stateStorage.get_children(basePath,
false)) {
+ String fullPath = basePath + ClusterUtils.ZK_SEPERATOR
+ version;
+ try {
+ PrivateWorkerKey key =
ClusterUtils.maybeDeserialize(stateStorage.get_data(fullPath, false),
PrivateWorkerKey.class);
+ if (Time.currentTimeMillis() >
key.get_expirationTimeMillis()) {
+ stateStorage.delete_node(fullPath);
+ }
+ } catch (RuntimeException e) {
+ //This should never happen because only the
primary nimbus is active, but just in case
+ // declare the race safe, even if we lose it.
+ if
(!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
+ throw e;
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ //No node for basePath is OK, noting to remove
--- End diff --
Spelling. s/noting/nothing/
---