This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new ca90996 KNOX-2551 - Token state management improvements (#414)
ca90996 is described below
commit ca909964cf0c61a205ce6dee2978ff19b4f13839
Author: Sandor Molnar <[email protected]>
AuthorDate: Mon Mar 15 02:26:38 2021 +0100
KNOX-2551 - Token state management improvements (#414)
* KNOX-2551 - AliasBasedTokenStateService is the default token state
service implementatation
* KNOX-2551 - Fixed parameter index in various token related log messages
* KNOX-2551 - Creating sub-nodes in ZK in case Knox Tokens are stored under
/knox/security/topology/__gateway
* KNOX-2551 - To address the side effects of optimistic replication in HA
mode the ZK token state service retries to fetch tokens from ZK until it's
found or the configured persistence interval is exceeded
* KNOX-2551 - Avoid removing --max aliases from the unpersisted in-memory
collection
* KNOX-2551 - ZK token state service performance improvements
Major changes:
- ZK token state service configures ZKRemoteAliasService to not use local
keystore
- ZK token state service implements loadTokensFromPersistenceStore to avoid
keystore lookup from parent; it actually does nothing as ZK entry change
listeners populate in-memory collections in DefaultTokenStateService
- token eviction runs independently of loadTokensFromPersistenceStore (not
like in AliasBasedTokenStateService as we no longer need to consider the global
keystore locking in DefaultKeystoreService)
* KNOX-2551 - Fixed addAlias in ZKRemoteAliasService to support saving
updated data for already existing aliases
* KNOX-2551 - Monitoring the token persister thread and re-initiate it in
case an error occured during task execution
---
.../provider/federation/jwt/JWTMessages.java | 6 +-
.../org/apache/knox/gateway/GatewayMessages.java | 3 +
.../services/factory/AbstractServiceFactory.java | 4 +-
.../services/factory/TokenStateServiceFactory.java | 6 +-
.../security/impl/ZookeeperRemoteAliasService.java | 140 +++++++++++++++++----
.../token/RemoteTokenStateChangeListener.java | 26 ++++
.../token/impl/AliasBasedTokenStateService.java | 129 ++++++++++++++-----
.../token/impl/DefaultTokenStateService.java | 12 +-
.../token/impl/JournalBasedTokenStateService.java | 2 +-
.../impl/TokenStatePeristerMonitorListener.java | 26 ++++
.../token/impl/TokenStatePersisterMonitor.java | 69 ++++++++++
.../token/impl/TokenStateServiceMessages.java | 41 +++++-
.../token/impl/ZookeeperTokenStateService.java | 94 +++++++++++++-
.../factory/TokenStateServiceFactoryTest.java | 10 +-
.../impl/AliasBasedTokenStateServiceTest.java | 51 ++++++--
.../token/impl/ZookeeperTokenStateServiceTest.java | 68 +++++++---
.../knox/gateway/util/ExecutorServiceUtils.java | 43 +++++++
17 files changed, 624 insertions(+), 106 deletions(-)
diff --git
a/gateway-provider-security-jwt/src/main/java/org/apache/knox/gateway/provider/federation/jwt/JWTMessages.java
b/gateway-provider-security-jwt/src/main/java/org/apache/knox/gateway/provider/federation/jwt/JWTMessages.java
index 8508af3..3ea7021 100644
---
a/gateway-provider-security-jwt/src/main/java/org/apache/knox/gateway/provider/federation/jwt/JWTMessages.java
+++
b/gateway-provider-security-jwt/src/main/java/org/apache/knox/gateway/provider/federation/jwt/JWTMessages.java
@@ -23,13 +23,13 @@ import org.apache.knox.gateway.i18n.messages.StackTrace;
@Messages(logger="org.apache.knox.gateway.provider.federation.jwt")
public interface JWTMessages {
- @Message( level = MessageLevel.WARN, text = "Failed to validate the audience
attribute for token {1} ({2})" )
+ @Message( level = MessageLevel.WARN, text = "Failed to validate the audience
attribute for token {0} ({1})" )
void failedToValidateAudience(String tokenDisplayText, String tokenId);
- @Message( level = MessageLevel.WARN, text = "Failed to verify the token
signature of {1} ({2})" )
+ @Message( level = MessageLevel.WARN, text = "Failed to verify the token
signature of {0} ({1})" )
void failedToVerifyTokenSignature(String tokenDisplayText, String tokenId);
- @Message( level = MessageLevel.INFO, text = "Access token {1} ({2}) has
expired; a new one must be acquired." )
+ @Message( level = MessageLevel.INFO, text = "Access token {0} ({1}) has
expired; a new one must be acquired." )
void tokenHasExpired(String tokenDisplayText, String tokenId);
@Message( level = MessageLevel.INFO, text = "The NotBefore check failed." )
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/GatewayMessages.java
b/gateway-server/src/main/java/org/apache/knox/gateway/GatewayMessages.java
index 0373e8b..4e72d92 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/GatewayMessages.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/GatewayMessages.java
@@ -626,6 +626,9 @@ public interface GatewayMessages {
text = "Error adding alias {1} for cluster {0} locally (local
keystore), cause: {2} ")
void errorAddingAliasLocally(String cluster, String alias, String cause);
+ @Message(level = MessageLevel.ERROR, text = "Error adding remote alias entry
listener for cluster {0} and alias {1}, cause: {2} ")
+ void errorAddingRemoteAliasEntryListener(String cluster, String alias,
String cause);
+
@Message(level = MessageLevel.INFO,
text = "Remove alias {1} for cluster {0} locally (local keystore) ")
void removeAliasLocally(String cluster, String alias);
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/AbstractServiceFactory.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/AbstractServiceFactory.java
index f855c76..4ead5d6 100644
---
a/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/AbstractServiceFactory.java
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/AbstractServiceFactory.java
@@ -79,7 +79,7 @@ public abstract class AbstractServiceFactory implements
ServiceFactory {
return match;
}
- private boolean isEmptyDefaultImplementation(String implementation) {
+ protected boolean isEmptyDefaultImplementation(String implementation) {
return EMPTY_DEFAULT_IMPLEMENTATION.equals(implementation);
}
@@ -100,7 +100,7 @@ public abstract class AbstractServiceFactory implements
ServiceFactory {
}
protected void logServiceUsage(String implementation, ServiceType
serviceType) {
- LOG.usingServiceImplementation("".equals(implementation) ? "default" :
implementation, serviceType.getServiceTypeName());
+
LOG.usingServiceImplementation(isEmptyDefaultImplementation(implementation) ?
"default" : implementation, serviceType.getServiceTypeName());
}
// abstract methods
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/TokenStateServiceFactory.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/TokenStateServiceFactory.java
index affdd64..7d8be07 100644
---
a/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/TokenStateServiceFactory.java
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/TokenStateServiceFactory.java
@@ -40,9 +40,9 @@ public class TokenStateServiceFactory extends
AbstractServiceFactory {
throws ServiceLifecycleException {
Service service = null;
if (shouldCreateService(implementation)) {
- if (matchesImplementation(implementation,
DefaultTokenStateService.class, true)) {
+ if (matchesImplementation(implementation,
DefaultTokenStateService.class)) {
service = new DefaultTokenStateService();
- } else if (matchesImplementation(implementation,
AliasBasedTokenStateService.class)) {
+ } else if (matchesImplementation(implementation,
AliasBasedTokenStateService.class, true)) {
service = new AliasBasedTokenStateService();
((AliasBasedTokenStateService)
service).setAliasService(getAliasService(gatewayServices));
} else if (matchesImplementation(implementation,
JournalBasedTokenStateService.class)) {
@@ -51,7 +51,7 @@ public class TokenStateServiceFactory extends
AbstractServiceFactory {
service = new ZookeeperTokenStateService(gatewayServices);
}
- logServiceUsage(implementation, serviceType);
+ logServiceUsage(isEmptyDefaultImplementation(implementation) ?
AliasBasedTokenStateService.class.getName() : implementation, serviceType);
}
return service;
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/security/impl/ZookeeperRemoteAliasService.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/security/impl/ZookeeperRemoteAliasService.java
index c781695..d4f325c 100644
---
a/gateway-server/src/main/java/org/apache/knox/gateway/services/security/impl/ZookeeperRemoteAliasService.java
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/security/impl/ZookeeperRemoteAliasService.java
@@ -31,13 +31,16 @@ import
org.apache.knox.gateway.services.security.AliasService;
import org.apache.knox.gateway.services.security.AliasServiceException;
import org.apache.knox.gateway.services.security.EncryptionResult;
import org.apache.knox.gateway.services.security.MasterService;
+import org.apache.knox.gateway.services.token.RemoteTokenStateChangeListener;
import org.apache.knox.gateway.util.PasswordUtils;
import org.apache.zookeeper.ZooDefs;
import java.nio.charset.StandardCharsets;
import java.security.cert.Certificate;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -52,6 +55,12 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
public static final String PATH_KNOX_SECURITY = PATH_KNOX + "/security";
public static final String PATH_KNOX_ALIAS_STORE_TOPOLOGY =
PATH_KNOX_SECURITY + "/topology";
public static final String PATH_SEPARATOR = "/";
+ private static final String BASE_SUB_NODE = PATH_KNOX_ALIAS_STORE_TOPOLOGY
+ PATH_SEPARATOR;
+ private static final String GATEWAY_SUB_NODE = BASE_SUB_NODE +
NO_CLUSTER_NAME;
+ public static final String OPTION_NAME_SHOULD_CREATE_TOKENS_SUB_NODE =
"zkShouldCreateTokenSubnodes";
+ public static final String OPTION_NAME_SHOULD_USE_LOCAL_ALIAS =
"zkShouldUseLocalAlias";
+ public static final String TOKENS_SUB_NODE_NAME = "tokens";
+ public static final String TOKENS_SUB_NODE_PATH = PATH_SEPARATOR +
TOKENS_SUB_NODE_NAME;
private static final GatewayMessages LOG =
MessagesFactory.get(GatewayMessages.class);
// N.B. This is ZooKeeper-specific, and should be abstracted when another
registry is supported
@@ -114,10 +123,13 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
private final AliasService localAliasService;
private final MasterService ms;
private final RemoteConfigurationRegistryClientService
remoteConfigurationRegistryClientService;
+ private final Collection<RemoteTokenStateChangeListener>
remoteTokenStateChangeListeners = new HashSet<>();
private RemoteConfigurationRegistryClient remoteClient;
private ConfigurableEncryptor encryptor;
private GatewayConfig config;
+ private boolean shouldCreateTokensSubNode;
+ private boolean shouldUseLocalAliasService;
ZookeeperRemoteAliasService(AliasService localAliasService, MasterService
ms, RemoteConfigurationRegistryClientService
remoteConfigurationRegistryClientService) {
this.localAliasService = localAliasService;
@@ -125,19 +137,34 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
this.remoteConfigurationRegistryClientService =
remoteConfigurationRegistryClientService;
}
+ public void
registerRemoteTokenStateChangeListener(RemoteTokenStateChangeListener
changeListener) {
+ this.remoteTokenStateChangeListeners.add(changeListener);
+ }
+
/**
* Build an entry path for the given cluster and alias
*/
- private static String buildAliasEntryName(final String clusterName, final
String alias) {
- // Convert all alias names to lower case (JDK-4891485)
- return buildClusterEntryName(clusterName) + PATH_SEPARATOR +
alias.toLowerCase(Locale.ROOT);
+ private String buildAliasEntryName(final String clusterName, final String
alias) {
+ final StringBuilder aliasEntryNameBuilder = new
StringBuilder(buildClusterEntryName(clusterName));
+ // Convert all alias names to lower case (JDK-4891485)
+ final String lowercaseAlias = alias.toLowerCase(Locale.ROOT);
+ if (shouldCreateTokensSubNode) {
+ aliasEntryNameBuilder.append(TOKENS_SUB_NODE_PATH);
+ ensureEntry(aliasEntryNameBuilder.toString(), remoteClient); // the
'tokens' sub-node has to be created in ZK
+ // the new sub-node name is the first 2 characters (if any) of the
provided alias name
+ final String newSubnodeName = lowercaseAlias.length() < 2 ?
lowercaseAlias : lowercaseAlias.substring(0, 2);
+ aliasEntryNameBuilder.append(PATH_SEPARATOR).append(newSubnodeName);
+ ensureEntry(aliasEntryNameBuilder.toString(), remoteClient); // the
new sub-node has to be created in ZK
+ }
+
+ return
aliasEntryNameBuilder.append(PATH_SEPARATOR).append(lowercaseAlias).toString();
}
/**
* Build an entry path for the given cluster
*/
private static String buildClusterEntryName(final String clusterName) {
- return PATH_KNOX_ALIAS_STORE_TOPOLOGY + PATH_SEPARATOR + clusterName;
+ return BASE_SUB_NODE + clusterName;
}
/**
@@ -183,7 +210,7 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
ensureEntry(PATH_KNOX, remoteClient);
ensureEntry(PATH_KNOX_SECURITY, remoteClient);
ensureEntry(PATH_KNOX_ALIAS_STORE_TOPOLOGY, remoteClient);
- ensureEntry(PATH_KNOX_ALIAS_STORE_TOPOLOGY + PATH_SEPARATOR +
NO_CLUSTER_NAME, remoteClient);
+ ensureEntry(GATEWAY_SUB_NODE, remoteClient);
}
/**
@@ -195,10 +222,14 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
*/
@Override
public List<String> getAliasesForCluster(final String clusterName) throws
AliasServiceException {
- final List<String> localAliases =
localAliasService.getAliasesForCluster(clusterName);
+ final List<String> localAliases = shouldUseLocalAliasService ?
localAliasService.getAliasesForCluster(clusterName) : null;
if (localAliases == null || localAliases.isEmpty()) {
if (remoteClient != null) {
- final List<String> remoteAliases =
remoteClient.listChildEntries(buildClusterEntryName(clusterName));
+ List<String> remoteAliases = null;
+ String entryName = buildClusterEntryName(clusterName);
+ if (remoteClient.entryExists(entryName)) {
+ remoteAliases = remoteClient.listChildEntries(entryName);
+ }
return remoteAliases == null ? new ArrayList<>() : remoteAliases;
} else {
return new ArrayList<>();
@@ -217,7 +248,11 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
checkPathsExist(remoteClient);
ensureEntry(buildClusterEntryName(clusterName), remoteClient);
try {
+ if (remoteClient.entryExists(aliasEntryPath)) {
+ remoteClient.setEntryData(aliasEntryPath, encrypt(value));
+ } else {
remoteClient.createEntry(aliasEntryPath, encrypt(value));
+ }
} catch (Exception e) {
throw new AliasServiceException(e);
}
@@ -265,7 +300,7 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
@Override
public char[] getPasswordFromAliasForCluster(String clusterName, String
alias, boolean generate) throws AliasServiceException {
- char[] password =
localAliasService.getPasswordFromAliasForCluster(clusterName, alias);
+ char[] password = shouldUseLocalAliasService ?
localAliasService.getPasswordFromAliasForCluster(clusterName, alias, generate)
: null;
/* try to get it from remote registry */
if (password == null && remoteClient != null) {
@@ -354,6 +389,8 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
throw new IllegalStateException("Unable to access remote path:
" + PATH_KNOX_ALIAS_STORE_TOPOLOGY);
}
+ this.shouldUseLocalAliasService =
Boolean.parseBoolean(options.getOrDefault(OPTION_NAME_SHOULD_USE_LOCAL_ALIAS,
"true"));
+
/* Register a listener for aliases entry additions/removals */
try {
remoteClient.addChildEntryListener(PATH_KNOX_ALIAS_STORE_TOPOLOGY, new
RemoteAliasChildListener(this));
@@ -363,6 +400,8 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
encryptor = new ConfigurableEncryptor(new
String(ms.getMasterSecret()));
encryptor.init(config);
+
+ this.shouldCreateTokensSubNode =
Boolean.parseBoolean(options.getOrDefault(OPTION_NAME_SHOULD_CREATE_TOKENS_SUB_NODE,
"false"));
} else {
LOG.missingClientConfigurationForRemoteMonitoring();
}
@@ -425,7 +464,7 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
ensureEntry(PATH_KNOX, remoteClient);
ensureEntry(PATH_KNOX_SECURITY, remoteClient);
ensureEntry(PATH_KNOX_ALIAS_STORE_TOPOLOGY, remoteClient);
- ensureEntry(PATH_KNOX_ALIAS_STORE_TOPOLOGY + PATH_SEPARATOR +
NO_CLUSTER_NAME, remoteClient);
+ ensureEntry(GATEWAY_SUB_NODE, remoteClient);
}
/**
@@ -441,32 +480,51 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
@Override
public void childEvent(final RemoteConfigurationRegistryClient client,
final Type type, final String path) {
- final String subPath = StringUtils.substringAfter(path,
PATH_KNOX_ALIAS_STORE_TOPOLOGY + PATH_SEPARATOR);
- final String[] paths = StringUtils.split(subPath, '/');
+ final String subPath = StringUtils.substringAfter(path,
BASE_SUB_NODE);
+ final String[] subPathParts = StringUtils.split(subPath, '/');
+
+ // Possible values are:
+ // - /knox/security/topology/cluster
+ // - /knox/security/topology/cluster/alias
+ // - /knox/security/topology/cluster/tokens/tokenSubNode/alias
+ final String cluster = subPathParts.length > 1 ? subPathParts[0] :
"";
+ final boolean tokenSubNode =
subPath.contains(TOKENS_SUB_NODE_NAME);
+ String alias = "";
+ if (tokenSubNode) {
+ alias = subPathParts.length == 4 ? subPathParts[3] : "";
+ } else {
+ alias = subPathParts.length == 2 ? subPathParts[1] : "";
+ }
switch (type) {
case REMOVED:
try {
/* remove listener */
client.removeEntryListener(path);
- if (paths.length > 1) {
- localAliasService.removeAliasForCluster(paths[0],
paths[1]);
+ if (!alias.isEmpty()) {
+ for (RemoteTokenStateChangeListener changeListener :
remoteTokenStateChangeListeners) {
+ changeListener.onRemoved(alias);
+ }
+
+ if (shouldUseLocalAliasService) {
+ LOG.removeAliasLocally(cluster, alias);
+ localAliasService.removeAliasForCluster(cluster,
alias);
+ }
}
} catch (final Exception e) {
- LOG.errorRemovingAliasLocally(paths[0], paths[1],
e.toString());
+ LOG.errorRemovingAliasLocally(cluster, alias,
e.toString());
}
break;
case ADDED:
/* do not set listeners on cluster name but on respective
aliases */
- if (paths.length > 1) {
- LOG.addAliasLocally(paths[0], paths[1]);
+ if (!alias.isEmpty()) {
try {
- client.addEntryListener(path, new
RemoteAliasEntryListener(paths[0], paths[1], localAliasService));
+ client.addEntryListener(path, new
RemoteAliasEntryListener(cluster, alias, localAliasService));
} catch (final Exception e) {
- LOG.errorRemovingAliasLocally(paths[0], paths[1],
e.toString());
+ LOG.errorAddingRemoteAliasEntryListener(cluster,
alias, e.toString());
}
- } else if (subPath != null) {
+ } else if (!BASE_SUB_NODE.equals(path)) {
/* Add a child listener for the cluster */
LOG.addRemoteListener(path);
try {
@@ -498,11 +556,47 @@ public class ZookeeperRemoteAliasService extends
AbstractAliasService {
@Override
public void entryChanged(final RemoteConfigurationRegistryClient
client, final String path, final byte[] data) {
+ if (!TOKENS_SUB_NODE_NAME.equals(alias) && isAliasPath(path)) {
+ String decryptedData = null;
try {
- localAliasService.addAliasForCluster(cluster, alias, decrypt(new
String(data, StandardCharsets.UTF_8)));
- } catch (final Exception e) {
- /* log and move on */
- LOG.errorAddingAliasLocally(cluster, alias, e.toString());
+ decryptedData = decrypt(new String(data,
StandardCharsets.UTF_8));
+ } catch (Exception e) {
+ throw new IllegalArgumentException("An error occurred while
trying to decrypt data for alias " + alias, e);
+ }
+
+ //if this is a token related alias, notify listeners
+ if (path.contains(TOKENS_SUB_NODE_PATH)) {
+ for (RemoteTokenStateChangeListener changeListener :
remoteTokenStateChangeListeners) {
+ changeListener.onChanged(alias, decryptedData);
+ }
+ }
+
+ if (shouldUseLocalAliasService) {
+ try {
+ LOG.addAliasLocally(cluster, alias);
+ localAliasService.addAliasForCluster(cluster, alias,
decryptedData);
+ } catch (final Exception e) {
+ /* log and move on */
+ LOG.errorAddingAliasLocally(cluster, alias, e.toString());
+ }
+ }
+ }
+ }
+
+ private boolean isAliasPath(String path) {
+ final String subPath = StringUtils.substringAfter(path,
BASE_SUB_NODE);
+ final String[] subPathParts = StringUtils.split(subPath, '/');
+
+ // Possible subPath values are:
+ // - /cluster
+ // - /cluster/alias
+ // - /cluster/tokens
+ // - /cluster/tokens/tokenSubNode
+ // - /cluster/tokens/tokenSubNode/alias
+ if (subPath.contains(TOKENS_SUB_NODE_NAME)) {
+ return subPathParts.length == 4;
+ } else {
+ return subPathParts.length == 2;
}
}
}
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/RemoteTokenStateChangeListener.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/RemoteTokenStateChangeListener.java
new file mode 100644
index 0000000..fb8fe8e
--- /dev/null
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/RemoteTokenStateChangeListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.services.token;
+
+public interface RemoteTokenStateChangeListener {
+
+ void onChanged(String alias, String updatedState);
+
+ void onRemoved(String alias);
+
+}
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/AliasBasedTokenStateService.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/AliasBasedTokenStateService.java
index 2581d94..545fa96 100644
---
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/AliasBasedTokenStateService.java
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/AliasBasedTokenStateService.java
@@ -26,11 +26,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,21 +47,22 @@ import
org.apache.knox.gateway.services.token.TokenStateServiceStatistics;
import
org.apache.knox.gateway.services.token.impl.state.TokenStateJournalFactory;
import org.apache.knox.gateway.services.token.state.JournalEntry;
import org.apache.knox.gateway.services.token.state.TokenStateJournal;
+import org.apache.knox.gateway.util.ExecutorServiceUtils;
/**
* A TokenStateService implementation based on the AliasService.
*/
-public class AliasBasedTokenStateService extends DefaultTokenStateService {
+public class AliasBasedTokenStateService extends DefaultTokenStateService
implements TokenStatePeristerMonitorListener {
static final String TOKEN_MAX_LIFETIME_POSTFIX = "--max";
- private AliasService aliasService;
+ protected AliasService aliasService;
- private long statePersistenceInterval = TimeUnit.SECONDS.toSeconds(15);
+ protected long statePersistenceInterval = TimeUnit.SECONDS.toSeconds(15);
private ScheduledExecutorService statePersistenceScheduler;
- private final List<TokenState> unpersistedState = new ArrayList<>();
+ private final Set<TokenState> unpersistedState = new HashSet<>();
private final AtomicBoolean readyForEviction = new AtomicBoolean(false);
@@ -110,9 +111,6 @@ public class AliasBasedTokenStateService extends
DefaultTokenStateService {
}
statePersistenceInterval =
config.getKnoxTokenStateAliasPersistenceInterval();
- if (statePersistenceInterval > 0) {
- statePersistenceScheduler = Executors.newScheduledThreadPool(1);
- }
if (tokenStateServiceStatistics != null) {
this.gatewayCredentialsFilePath =
Paths.get(config.getGatewayKeystoreDir()).resolve(AliasService.NO_CLUSTER_NAME
+ DefaultKeystoreService.CREDENTIALS_SUFFIX +
config.getCredentialStoreType().toLowerCase(Locale.ROOT));
@@ -123,24 +121,21 @@ public class AliasBasedTokenStateService extends
DefaultTokenStateService {
@Override
public void start() throws ServiceLifecycleException {
super.start();
- if (statePersistenceScheduler != null) {
- // Run token persistence task at configured interval
- statePersistenceScheduler.scheduleAtFixedRate(this::persistTokenState,
- statePersistenceInterval,
- statePersistenceInterval,
- TimeUnit.SECONDS);
+ if (statePersistenceInterval > 0) {
+ //first schedule event; only happen if the feature is not disabled via
persistence interval settings
+ scheduleTokenStatePersistence();
}
// Loading ALL entries from __gateway-credentials.jceks could be VERY
time-consuming (it took a bit more than 19 minutes to load 12k aliases
// during my tests).
// Therefore, it's safer to do it in a background thread than just make
the service start hang until it's finished
- final ExecutorService gatewayCredentialsLoader =
Executors.newSingleThreadExecutor(new
BasicThreadFactory.Builder().namingPattern("GatewayCredentialsLoader").build());
- gatewayCredentialsLoader.execute(this::loadGatewayCredentialsOnStartup);
+ final ExecutorService gatewayCredentialsLoader =
Executors.newSingleThreadExecutor(new
BasicThreadFactory.Builder().namingPattern("PersistenceStoreLoader").build());
+
gatewayCredentialsLoader.execute(this::loadTokenAliasesFromPersistenceStore);
}
- private void loadGatewayCredentialsOnStartup() {
+ protected void loadTokenAliasesFromPersistenceStore() {
try {
- log.loadingGatewayCredentialsOnStartup();
+ log.loadingTokenAliasesFromPersistenceStore();
final long start = System.currentTimeMillis();
final Map<String, char[]> passwordAliasMap =
aliasService.getPasswordsForGateway();
String alias, tokenId;
@@ -161,12 +156,17 @@ public class AliasBasedTokenStateService extends
DefaultTokenStateService {
maxLifeTime =
convertCharArrayToLong(passwordAliasMapEntry.getValue());
super.updateExpiration(tokenId, expiration);
super.setMaxLifetime(tokenId, maxLifeTime);
- count++;
+ count+=2;
+ }
+
+ // log some progress (it's very useful in case a huge amount of token
related aliases in __gateway-credentials.jceks)
+ if (count % 100 == 0) {
+ log.loadedTokenAliasesFromPersistenceStore(count,
System.currentTimeMillis() - start);
}
}
- log.loadedGatewayCredentialsOnStartup(count * 2,
System.currentTimeMillis() - start); //count is multiplied by two: tokenId +
tokenId--max
+ log.loadedTokenAliasesFromPersistenceStore(count * 2,
System.currentTimeMillis() - start); //count is multiplied by two: tokenId +
tokenId--max
} catch (AliasServiceException e) {
- log.errorWhileLoadingGatewayCredentialsOnStartup(e.getMessage(), e);
+ log.errorWhileLoadingTokenAliasesFromPersistenceStore(e.getMessage(), e);
} finally {
readyForEviction.set(true);
}
@@ -188,6 +188,22 @@ public class AliasBasedTokenStateService extends
DefaultTokenStateService {
persistTokenState();
}
+ private void scheduleTokenStatePersistence() {
+ if (statePersistenceScheduler != null) {
+
ExecutorServiceUtils.shutdownAndAwaitTermination(statePersistenceScheduler, 10,
TimeUnit.SECONDS);
+ }
+ statePersistenceScheduler = Executors.newSingleThreadScheduledExecutor(new
BasicThreadFactory.Builder().namingPattern("TokenStatePerister-%d").build());
+ final ScheduledFuture<?> persistTokenStateTask =
statePersistenceScheduler.scheduleAtFixedRate(this::persistTokenState,
statePersistenceInterval, statePersistenceInterval, TimeUnit.SECONDS);
+ log.runningTokenStateAliasePersisterTask(statePersistenceInterval,
TimeUnit.SECONDS.toString());
+ final TokenStatePersisterMonitor taskMonitor = new
TokenStatePersisterMonitor(persistTokenStateTask, this);
+ taskMonitor.startMonitor();
+ }
+
+ @Override
+ public void onTokenStatePeristerTaskError(Throwable error) {
+ scheduleTokenStatePersistence();
+ }
+
protected void persistTokenState() {
Set<String> tokenIds = new HashSet<>(); // Collect the tokenIds for logging
@@ -281,15 +297,15 @@ public class AliasBasedTokenStateService extends
DefaultTokenStateService {
return result;
}
- private char[] getPasswordUsingAliasService(String tokenId) throws
AliasServiceException {
- char[] password =
aliasService.getPasswordFromAliasForCluster(AliasService.NO_CLUSTER_NAME,
tokenId);
+ protected char[] getPasswordUsingAliasService(String alias) throws
AliasServiceException {
+ char[] password =
aliasService.getPasswordFromAliasForCluster(AliasService.NO_CLUSTER_NAME,
alias);
if (tokenStateServiceStatistics != null) {
tokenStateServiceStatistics.interactKeystore(TokenStateServiceStatistics.KeystoreInteraction.GET_PASSWORD);
}
return password;
}
- private long convertCharArrayToLong(char[] charArray) {
+ protected long convertCharArrayToLong(char[] charArray) {
return Long.parseLong(new String(charArray));
}
@@ -348,7 +364,7 @@ public class AliasBasedTokenStateService extends
DefaultTokenStateService {
}
@Override
- protected void removeTokens(Set<String> tokenIds) throws
UnknownTokenException {
+ protected void removeTokens(Set<String> tokenIds) {
// If any of the token IDs is represented among the unpersisted state,
remove the associated state
synchronized (unpersistedState) {
@@ -381,28 +397,43 @@ public class AliasBasedTokenStateService extends
DefaultTokenStateService {
}
}
+ removeTokensFromMemory(tokenIds);
+ }
+
+ protected void removeTokensFromMemory(Set<String> tokenIds) {
super.removeTokens(tokenIds);
}
@Override
protected void updateExpiration(final String tokenId, long expiration) {
//Update in-memory
- super.updateExpiration(tokenId, expiration);
+ updateExpirationInMemory(tokenId, expiration);
//Update the in-memory representation of unpersisted states that will be
processed by the state persistence thread
synchronized (unpersistedState) {
- final Optional<TokenState> tokenStateToRemove =
unpersistedState.stream().filter(tokenState ->
tokenState.getTokenId().equals(tokenId)).findFirst();
- if (tokenStateToRemove.isPresent()) {
- unpersistedState.remove(tokenStateToRemove.get());
- }
unpersistedState.add(new TokenExpiration(tokenId, expiration));
}
}
+ protected void updateExpirationInMemory(final String tokenId, long
expiration) {
+ super.updateExpiration(tokenId, expiration);
+ }
+
+ enum TokenStateType {
+ EXP(1), MAX(2);
+
+ private final int id;
+
+ TokenStateType(int id) {
+ this.id = id;
+ }
+ }
+
interface TokenState {
String getTokenId();
String getAlias();
String getAliasValue();
+ TokenStateType getType();
}
private static final class TokenMaxLifetime implements TokenState {
@@ -432,13 +463,28 @@ public class AliasBasedTokenStateService extends
DefaultTokenStateService {
}
@Override
+ public TokenStateType getType() {
+ return TokenStateType.MAX;
+ }
+
+ @Override
public int hashCode() {
- return HashCodeBuilder.reflectionHashCode(this);
+ return new
HashCodeBuilder().append(tokenId).append(getType().id).toHashCode();
}
@Override
public boolean equals(Object obj) {
- return EqualsBuilder.reflectionEquals(this, obj);
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+ final TokenMaxLifetime rhs = (TokenMaxLifetime) obj;
+ return new EqualsBuilder().append(this.tokenId,
rhs.tokenId).append(this.getType().id, rhs.getType().id).isEquals();
}
}
@@ -467,13 +513,28 @@ public class AliasBasedTokenStateService extends
DefaultTokenStateService {
}
@Override
+ public TokenStateType getType() {
+ return TokenStateType.EXP;
+ }
+
+ @Override
public int hashCode() {
- return HashCodeBuilder.reflectionHashCode(this);
+ return new
HashCodeBuilder().append(tokenId).append(getType().id).toHashCode();
}
@Override
public boolean equals(Object obj) {
- return EqualsBuilder.reflectionEquals(this, obj);
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+ final TokenExpiration rhs = (TokenExpiration) obj;
+ return new EqualsBuilder().append(this.tokenId,
rhs.tokenId).append(this.getType().id, rhs.getType().id).isEquals();
}
}
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/DefaultTokenStateService.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/DefaultTokenStateService.java
index 042e167..93d8560 100644
---
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/DefaultTokenStateService.java
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/DefaultTokenStateService.java
@@ -274,10 +274,8 @@ public class DefaultTokenStateService implements
TokenStateService {
* Bulk removal of the specified tokens.
*
* @param tokenIds The unique identifiers of the tokens whose state should
be removed.
- *
- * @throws UnknownTokenException if the specified token in valid, but not
known to the service.
*/
- protected void removeTokens(final Set<String> tokenIds) throws
UnknownTokenException {
+ protected void removeTokens(final Set<String> tokenIds) {
removeTokenState(tokenIds);
}
@@ -343,11 +341,7 @@ public class DefaultTokenStateService implements
TokenStateService {
}
if (!tokensToEvict.isEmpty()) {
- try {
- removeTokens(tokensToEvict);
- } catch (UnknownTokenException e) {
- log.failedExpiredTokenEviction(e);
- }
+ removeTokens(tokensToEvict);
}
} else {
log.skipEviction();
@@ -377,7 +371,7 @@ public class DefaultTokenStateService implements
TokenStateService {
*
* @return
*/
- private List<String> getTokenIds() {
+ protected List<String> getTokenIds() {
return tokenExpirations.keySet().stream().collect(Collectors.toList());
}
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/JournalBasedTokenStateService.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/JournalBasedTokenStateService.java
index 949a29a..abee0e5 100644
---
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/JournalBasedTokenStateService.java
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/JournalBasedTokenStateService.java
@@ -130,7 +130,7 @@ public class JournalBasedTokenStateService extends
DefaultTokenStateService {
}
@Override
- protected void removeTokens(final Set<String> tokenIds) throws
UnknownTokenException {
+ protected void removeTokens(final Set<String> tokenIds) {
super.removeTokens(tokenIds);
try {
journal.remove(tokenIds);
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/TokenStatePeristerMonitorListener.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/TokenStatePeristerMonitorListener.java
new file mode 100644
index 0000000..fc1bdcc
--- /dev/null
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/TokenStatePeristerMonitorListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.knox.gateway.services.token.impl;
+
+/**
+ * Signals relevant events while monitoring the token state persister thread
+ */
+public interface TokenStatePeristerMonitorListener {
+
+ void onTokenStatePeristerTaskError(Throwable error);
+
+}
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/TokenStatePersisterMonitor.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/TokenStatePersisterMonitor.java
new file mode 100644
index 0000000..1d203d8
--- /dev/null
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/TokenStatePersisterMonitor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.knox.gateway.services.token.impl;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.util.ExecutorServiceUtils;
+
+class TokenStatePersisterMonitor {
+ private static final TokenStateServiceMessages log =
MessagesFactory.get(TokenStateServiceMessages.class);
+ private final ExecutorService monitor;
+ private final ScheduledFuture<?> taskToMonitor;
+ private final Set<TokenStatePeristerMonitorListener> listeners;
+
+ TokenStatePersisterMonitor(ScheduledFuture<?> taskToMonitor,
TokenStatePeristerMonitorListener listener) {
+ this(taskToMonitor, Collections.singleton(listener));
+ }
+
+ TokenStatePersisterMonitor(ScheduledFuture<?> taskToMonitor,
Set<TokenStatePeristerMonitorListener> listeners) {
+ this.monitor = Executors.newSingleThreadExecutor(new
BasicThreadFactory.Builder().namingPattern("TokenStatePeristerMonitor-%d").build());
+ this.taskToMonitor = taskToMonitor;
+ this.listeners = listeners;
+ }
+
+ void startMonitor() {
+ monitor.submit(this::monitorPersisterTask);
+ }
+
+ private void monitorPersisterTask() {
+ try {
+ /*
+ * This call doesn't return at each scheduled invocation but that it
returns
+ * - either for the last invocation of the task, that is a task
cancellation caused by ScheduledFuture.cancel()
+ * - or a exception thrown in the task.
+ */
+ taskToMonitor.get();
+ } catch (CancellationException e) {
+ log.cancelingTokenStateAliasePersisterTask();
+ } catch (Throwable e) {
+ log.errorRunningTokenStateAliasePersisterTask(e);
+ listeners.forEach(listener -> listener.onTokenStatePeristerTaskError(e));
+ } finally {
+ ExecutorServiceUtils.shutdownAndAwaitTermination(monitor, 10,
TimeUnit.SECONDS);
+ }
+ }
+
+}
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/TokenStateServiceMessages.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/TokenStateServiceMessages.java
index a83f638..ceb11a2 100644
---
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/TokenStateServiceMessages.java
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/TokenStateServiceMessages.java
@@ -82,6 +82,15 @@ public interface TokenStateServiceMessages {
@Message(level = MessageLevel.ERROR, text = "Error occurred evicting token
{0}")
void errorEvictingTokens(@StackTrace(level = MessageLevel.DEBUG) Exception
e);
+ @Message(level = MessageLevel.INFO, text = "Running token state alias
persister task in every {0} {1}")
+ void runningTokenStateAliasePersisterTask(long interval, String timeUnit);
+
+ @Message(level = MessageLevel.INFO, text = "Canceling token state alias
persister task")
+ void cancelingTokenStateAliasePersisterTask();
+
+ @Message(level = MessageLevel.ERROR, text = "An error occurred while running
token state alias persister task: {0}")
+ void errorRunningTokenStateAliasePersisterTask(@StackTrace(level =
MessageLevel.DEBUG) Throwable e);
+
@Message(level = MessageLevel.INFO, text = "Creating token state aliases")
void creatingTokenStateAliases();
@@ -139,13 +148,33 @@ public interface TokenStateServiceMessages {
@Message(level = MessageLevel.ERROR, text = "Failed to remove the token
state journal entries : {0}")
void failedToRemoveJournalEntries(@StackTrace(level = MessageLevel.DEBUG)
Exception e);
- @Message(level = MessageLevel.INFO, text = "Loading Gateway credentials on
startup...")
- void loadingGatewayCredentialsOnStartup();
+ @Message(level = MessageLevel.INFO, text = "Loading token aliases from
persistence store on startup...")
+ void loadingTokenAliasesFromPersistenceStore();
+
+ @Message(level = MessageLevel.INFO, text = "Processing {0} aliases from
persistence store on startup...")
+ void processingAliases(int count);
+
+ @Message(level = MessageLevel.INFO, text = "Loaded {0} token aliases from
persistence store in {1} milliseonds")
+ void loadedTokenAliasesFromPersistenceStore(int count, long duration);
+
+ @Message(level = MessageLevel.ERROR, text = "Error while loading token
aliases from persistence store on startup: {0}")
+ void errorWhileLoadingTokenAliasesFromPersistenceStore(String errorMessage,
@StackTrace(level = MessageLevel.DEBUG) Throwable e);
+
+ @Message(level = MessageLevel.ERROR, text = "Error while processing token
alias {0} : {1}")
+ void errorWhileProcessingTokenAlias(String alias, String errorMessage,
@StackTrace(level = MessageLevel.DEBUG) Throwable e);
+
+ @Message(level = MessageLevel.DEBUG, text = "Invalid alias value for {0}; it
has very likely been evicted in the meantime")
+ void invalidAliasValue(String alias);
+
+ @Message(level = MessageLevel.INFO, text = "Trying to fetch value for {0}
from Zookeeper...")
+ void retryZkFetchAlias(String alias);
- @Message(level = MessageLevel.INFO, text = "Loaded {0} Gateway credentials
in {1} milliseonds")
- void loadedGatewayCredentialsOnStartup(int count, long duration);
+ @Message(level = MessageLevel.ERROR, text = "Error while fetching value for
{0} from Zookeeper: {1}")
+ void failedRetryZkFetchAlias(String alias, String errorMessage,
@StackTrace(level = MessageLevel.DEBUG) Exception e);
- @Message(level = MessageLevel.ERROR, text = "Error while loading Gateway
credentials on startup: {0}")
- void errorWhileLoadingGatewayCredentialsOnStartup(String errorMessage,
@StackTrace(level = MessageLevel.DEBUG) Exception e);
+ @Message(level = MessageLevel.INFO, text = "Processed alias {0} on receiving
signal from Zookeeper ")
+ void onRemoteTokenStateChanged(String alias);
+ @Message(level = MessageLevel.INFO, text = "Removed related token alias {0}
on receiving signal from Zookeeper ")
+ void onRemoteTokenStateRemoval(String alias);
}
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/ZookeeperTokenStateService.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/ZookeeperTokenStateService.java
index 31d343a..161bfc3 100644
---
a/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/ZookeeperTokenStateService.java
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/token/impl/ZookeeperTokenStateService.java
@@ -19,20 +19,25 @@ package org.apache.knox.gateway.services.token.impl;
import static org.apache.knox.gateway.services.ServiceType.ALIAS_SERVICE;
+import java.time.Instant;
+import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.services.GatewayServices;
import org.apache.knox.gateway.services.ServiceLifecycleException;
import org.apache.knox.gateway.services.factory.AliasServiceFactory;
+import org.apache.knox.gateway.services.security.AliasServiceException;
import
org.apache.knox.gateway.services.security.impl.ZookeeperRemoteAliasService;
+import org.apache.knox.gateway.services.token.RemoteTokenStateChangeListener;
/**
* A Zookeeper Token State Service is actually an Alias based TSS where the
'alias service' happens to be the 'zookeeper' implementation.
* This means the only important thing that should be overridden here is the
init method where the underlying alias service is configured
* properly.
*/
-public class ZookeeperTokenStateService extends AliasBasedTokenStateService {
+public class ZookeeperTokenStateService extends AliasBasedTokenStateService
implements RemoteTokenStateChangeListener {
private final GatewayServices gatewayServices;
private final AliasServiceFactory aliasServiceFactory;
@@ -50,8 +55,95 @@ public class ZookeeperTokenStateService extends
AliasBasedTokenStateService {
public void init(GatewayConfig config, Map<String, String> options) throws
ServiceLifecycleException {
final ZookeeperRemoteAliasService zookeeperAliasService =
(ZookeeperRemoteAliasService) aliasServiceFactory.create(gatewayServices,
ALIAS_SERVICE, config, options,
ZookeeperRemoteAliasService.class.getName());
+
options.put(ZookeeperRemoteAliasService.OPTION_NAME_SHOULD_CREATE_TOKENS_SUB_NODE,
"true");
+
options.put(ZookeeperRemoteAliasService.OPTION_NAME_SHOULD_USE_LOCAL_ALIAS,
"false");
+ zookeeperAliasService.registerRemoteTokenStateChangeListener(this);
zookeeperAliasService.init(config, options);
super.setAliasService(zookeeperAliasService);
super.init(config, options);
+
options.remove(ZookeeperRemoteAliasService.OPTION_NAME_SHOULD_CREATE_TOKENS_SUB_NODE);
+
options.remove(ZookeeperRemoteAliasService.OPTION_NAME_SHOULD_USE_LOCAL_ALIAS);
+ }
+
+ @Override
+ protected void loadTokenAliasesFromPersistenceStore() {
+ // NOP : registering 'knox/security/topology' child entry listener in
ZKRemoteAliasService ends-up reading existing ZK nodes
+ // and with the help of RemoteTokenStateChangeListener notifications
in-memory collections will be populated
+ // without loading them here directly
+ }
+
+ @Override
+ protected boolean readyForEviction() {
+ return true;
+ }
+
+ @Override
+ protected char[] getPasswordUsingAliasService(String alias) throws
AliasServiceException {
+ char[] password = super.getPasswordUsingAliasService(alias);
+
+ if (password == null) {
+ password = retry(alias);
+ }
+ return password;
+ }
+
+ /*
+ * In HA scenarios, it might happen, that node1 generated a token but the
state
+ * persister thread saves that token in ZK a bit later. If there is a
subsequent
+ * call to this token on another node - e.g. node2 - before it's persisted
in ZK
+ * the token would be considered unknown. (see CDPD-22225)
+ *
+ * To avoid this issue, the ZK token state service should retry to fetch the
+ * token from ZK in every second until the token is found or the number of
+ * retries exceeded the configured persistence interval
+ */
+ private char[] retry(String alias) throws AliasServiceException {
+ char[] password = null;
+ final Instant timeLimit =
Instant.now().plusSeconds(statePersistenceInterval).plusSeconds(1); // an
addition of 1 second as grace period
+
+ while (password == null && timeLimit.isAfter(Instant.now())) {
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ log.retryZkFetchAlias(alias);
+ password = super.getPasswordUsingAliasService(alias);
+ } catch (InterruptedException e) {
+ log.failedRetryZkFetchAlias(alias, e.getMessage(), e);
+ }
+ }
+ return password;
+ }
+
+ @Override
+ public void onChanged(String alias, String updatedState) {
+ processAlias(alias, updatedState);
+ log.onRemoteTokenStateChanged(alias);
+ }
+
+ @Override
+ public void onRemoved(String alias) {
+ final String tokenId = getTokenIdFromAlias(alias);
+ removeTokensFromMemory(Collections.singleton(tokenId));
+ log.onRemoteTokenStateRemoval(alias);
+ }
+
+ private void processAlias(String alias, String value) {
+ if (!ZookeeperRemoteAliasService.TOKENS_SUB_NODE_NAME.equals(alias)) {
+ try {
+ final String tokenId = getTokenIdFromAlias(alias);
+ if (alias.endsWith(TOKEN_MAX_LIFETIME_POSTFIX)) {
+ final long maxLifeTime = Long.parseLong(value);
+ setMaxLifetime(tokenId, maxLifeTime);
+ } else {
+ final long expiration = Long.parseLong(value);
+ updateExpirationInMemory(tokenId, expiration);
+ }
+ } catch (Throwable e) {
+ log.errorWhileProcessingTokenAlias(alias, e.getMessage(), e);
+ }
+ }
+ }
+
+ private String getTokenIdFromAlias(String alias) {
+ return alias.indexOf("--") == -1 ? alias : alias.substring(0,
alias.indexOf("--")); // both --max and --unused starts with '--';
}
}
diff --git
a/gateway-server/src/test/java/org/apache/knox/gateway/services/factory/TokenStateServiceFactoryTest.java
b/gateway-server/src/test/java/org/apache/knox/gateway/services/factory/TokenStateServiceFactoryTest.java
index 272593e..5a3675e 100644
---
a/gateway-server/src/test/java/org/apache/knox/gateway/services/factory/TokenStateServiceFactoryTest.java
+++
b/gateway-server/src/test/java/org/apache/knox/gateway/services/factory/TokenStateServiceFactoryTest.java
@@ -43,13 +43,17 @@ public class TokenStateServiceFactoryTest extends
ServiceFactoryTest {
}
@Test
- public void shouldReturnDefaultTopkenStateService() throws Exception {
+ public void shouldReturnDefaultTokenStateService() throws Exception {
TokenStateService tokenStateService = (TokenStateService)
serviceFactory.create(gatewayServices, ServiceType.TOKEN_STATE_SERVICE,
gatewayConfig, options,
DefaultTokenStateService.class.getName());
assertTrue(tokenStateService instanceof DefaultTokenStateService);
+ }
- tokenStateService = (TokenStateService)
serviceFactory.create(gatewayServices, ServiceType.TOKEN_STATE_SERVICE,
gatewayConfig, options, "");
- assertTrue(tokenStateService instanceof DefaultTokenStateService);
+ @Test
+ public void shouldReturnAliasBasedTokenStateServiceByDefault() throws
Exception {
+ TokenStateService tokenStateService = (TokenStateService)
serviceFactory.create(gatewayServices, ServiceType.TOKEN_STATE_SERVICE,
gatewayConfig, options, "");
+ assertTrue(tokenStateService instanceof AliasBasedTokenStateService);
+ assertTrue(isAliasServiceSet(tokenStateService));
}
@Test
diff --git
a/gateway-server/src/test/java/org/apache/knox/gateway/services/token/impl/AliasBasedTokenStateServiceTest.java
b/gateway-server/src/test/java/org/apache/knox/gateway/services/token/impl/AliasBasedTokenStateServiceTest.java
index bca6088..5da2f5d 100644
---
a/gateway-server/src/test/java/org/apache/knox/gateway/services/token/impl/AliasBasedTokenStateServiceTest.java
+++
b/gateway-server/src/test/java/org/apache/knox/gateway/services/token/impl/AliasBasedTokenStateServiceTest.java
@@ -523,7 +523,7 @@ public class AliasBasedTokenStateServiceTest extends
DefaultTokenStateServiceTes
Map<String, Long> tokenExpirations = getTokenExpirationsField(tss);
Map<String, Long> maxTokenLifetimes = getMaxTokenLifetimesField(tss);
- List<AliasBasedTokenStateService.TokenState> unpersistedState =
getUnpersistedStateField(tss);
+ Set<AliasBasedTokenStateService.TokenState> unpersistedState =
getUnpersistedStateField(tss);
assertEquals("Expected the tokens expirations to have been added in the
base class cache.",
TOKEN_COUNT,
@@ -599,7 +599,7 @@ public class AliasBasedTokenStateServiceTest extends
DefaultTokenStateServiceTes
Map<String, Long> tokenExpirations = getTokenExpirationsField(tss);
Map<String, Long> maxTokenLifetimes = getMaxTokenLifetimesField(tss);
- List<AliasBasedTokenStateService.TokenState> unpersistedState =
getUnpersistedStateField(tss);
+ Set<AliasBasedTokenStateService.TokenState> unpersistedState =
getUnpersistedStateField(tss);
assertEquals("Expected the tokens expirations to have been added in the
base class cache.",
TOKEN_COUNT,
@@ -617,6 +617,41 @@ public class AliasBasedTokenStateServiceTest extends
DefaultTokenStateServiceTes
EasyMock.verify(aliasService);
}
+ @Test
+ public void ensureAliases() throws Exception {
+ final int tokenCount = 1000;
+ final Set<JWTToken> testTokens = new HashSet<>();
+ for (int i = 0; i < tokenCount ; i++) {
+ JWTToken token = createMockToken(System.currentTimeMillis() -
TimeUnit.SECONDS.toMillis(60));
+ testTokens.add(token);
+ }
+
+ final AliasBasedTokenStateService tss = (AliasBasedTokenStateService)
createTokenStateService();
+ final long issueTime = System.currentTimeMillis();
+ for (JWTToken token : testTokens) {
+ tss.addToken(token, issueTime);
+ tss.renewToken(token);
+ }
+
+ final List<AliasBasedTokenStateService.TokenState> unpersistedTokenStates
= new ArrayList<>(getUnpersistedStateField(tss, false));
+ final int expectedAliasCount = 2 * tokenCount; //expiration + max for each
token
+ assertEquals(expectedAliasCount, unpersistedTokenStates.size());
+ for (JWTToken token : testTokens) {
+ String tokenId = token.getClaim(JWTToken.KNOX_ID_CLAIM);
+ assertTrue(containsAlias(unpersistedTokenStates, tokenId));
+ assertTrue(containsAlias(unpersistedTokenStates, tokenId +
AliasBasedTokenStateService.TOKEN_MAX_LIFETIME_POSTFIX));
+ }
+ }
+
+ private boolean containsAlias(List<AliasBasedTokenStateService.TokenState>
unpersistedTokenStates, String expectedAlias) {
+ for(AliasBasedTokenStateService.TokenState tokenState :
unpersistedTokenStates) {
+ if (tokenState.getAlias().equals(expectedAlias)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
protected TokenStateService createTokenStateService() throws Exception {
AliasBasedTokenStateService tss = new AliasBasedTokenStateService();
@@ -793,12 +828,14 @@ public class AliasBasedTokenStateServiceTest extends
DefaultTokenStateServiceTes
return (Map<String, Long>) maxTokenLifetimesField.get(tss);
}
- private static List<AliasBasedTokenStateService.TokenState>
getUnpersistedStateField(TokenStateService tss)
- throws Exception {
- Field unpersistedStateField =
tss.getClass().getSuperclass().getDeclaredField("unpersistedState");
- unpersistedStateField.setAccessible(true);
- return (List<AliasBasedTokenStateService.TokenState>)
unpersistedStateField.get(tss);
+ private static Set<AliasBasedTokenStateService.TokenState>
getUnpersistedStateField(TokenStateService tss) throws Exception {
+ return getUnpersistedStateField(tss, true);
+ }
+ private static Set<AliasBasedTokenStateService.TokenState>
getUnpersistedStateField(TokenStateService tss, boolean fromParent) throws
Exception {
+ Field unpersistedStateField = fromParent ?
tss.getClass().getSuperclass().getDeclaredField("unpersistedState") :
tss.getClass().getDeclaredField("unpersistedState");
+ unpersistedStateField.setAccessible(true);
+ return (Set<AliasBasedTokenStateService.TokenState>)
unpersistedStateField.get(tss);
}
private static class TestJournalEntry implements JournalEntry {
diff --git
a/gateway-server/src/test/java/org/apache/knox/gateway/services/token/impl/ZookeeperTokenStateServiceTest.java
b/gateway-server/src/test/java/org/apache/knox/gateway/services/token/impl/ZookeeperTokenStateServiceTest.java
index f12689c..443d409 100644
---
a/gateway-server/src/test/java/org/apache/knox/gateway/services/token/impl/ZookeeperTokenStateServiceTest.java
+++
b/gateway-server/src/test/java/org/apache/knox/gateway/services/token/impl/ZookeeperTokenStateServiceTest.java
@@ -21,9 +21,11 @@ import static
org.apache.knox.gateway.config.GatewayConfig.REMOTE_CONFIG_REGISTR
import static
org.apache.knox.gateway.config.GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -39,10 +41,12 @@ import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.service.config.remote.zk.ZooKeeperClientService;
import
org.apache.knox.gateway.service.config.remote.zk.ZooKeeperClientServiceProvider;
import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceLifecycleException;
import org.apache.knox.gateway.services.ServiceType;
import
org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClientService;
import org.apache.knox.gateway.services.security.AliasService;
import org.apache.knox.gateway.services.security.KeystoreService;
+import org.apache.knox.gateway.services.security.KeystoreServiceException;
import org.apache.knox.gateway.services.security.MasterService;
import org.easymock.EasyMock;
import org.junit.AfterClass;
@@ -57,7 +61,8 @@ public class ZookeeperTokenStateServiceTest {
public static final TemporaryFolder testFolder = new TemporaryFolder();
private static final String CONFIG_MONITOR_NAME =
"remoteConfigMonitorClient";
- private static final long TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL = 2L;
+ private static final long SHORT_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL = 2L;
+ private static final long LONG_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL = 5L;
private static TestingCluster zkNodes;
@BeforeClass
@@ -88,6 +93,50 @@ public class ZookeeperTokenStateServiceTest {
@Test
public void testStoringTokenAliasesInZookeeper() throws Exception {
+ final ZookeeperTokenStateService zktokenStateService =
setupZkTokenStateService(SHORT_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL);
+
+
assertFalse(zkNodeExists("/knox/security/topology/__gateway/tokens/a0/a0-token1"));
+
assertFalse(zkNodeExists("/knox/security/topology/__gateway/tokens/a0/a0-token1--max"));
+
+ zktokenStateService.addToken("a0-token1", 1L, 2L);
+
+ // give some time for the token state service to persist the token aliases
in ZK (doubled the persistence interval)
+ Thread.sleep(2 * SHORT_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL * 1000);
+
+
assertTrue(zkNodeExists("/knox/security/topology/__gateway/tokens/a0/a0-token1"));
+
assertTrue(zkNodeExists("/knox/security/topology/__gateway/tokens/a0/a0-token1--max"));
+ }
+
+ @Test
+ public void testRetry() throws Exception {
+ final ZookeeperTokenStateService zktokenStateServiceNode1 =
setupZkTokenStateService(LONG_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL);
+ final ZookeeperTokenStateService zktokenStateServiceNode2 =
setupZkTokenStateService(LONG_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL);
+ zktokenStateServiceNode1.addToken("node1Token", 10L, 2000L);
+ final long expiration =
zktokenStateServiceNode2.getTokenExpiration("node1Token");
+ Thread.sleep(LONG_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL * 1000);
+ assertEquals(2000L, expiration);
+ }
+
+ @Test
+ public void testRenewal() throws Exception {
+ final ZookeeperTokenStateService zktokenStateServiceNode1 =
setupZkTokenStateService(SHORT_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL);
+ final ZookeeperTokenStateService zktokenStateServiceNode2 =
setupZkTokenStateService(SHORT_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL);
+ final String tokenId = "a1-token";
+ final long issueTime = System.currentTimeMillis();
+ final long tokenTTL = 1000L;
+ final long renewInterval = 2000L;
+
+ zktokenStateServiceNode1.addToken(tokenId, issueTime, issueTime +
tokenTTL);
+ Thread.sleep(SHORT_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL * 1500);
+ assertEquals(zktokenStateServiceNode1.getTokenExpiration(tokenId),
zktokenStateServiceNode2.getTokenExpiration(tokenId));
+
+ //now renew token on node 1 and check if renewal is reflected on node2
+ zktokenStateServiceNode1.renewToken(tokenId, renewInterval);
+ Thread.sleep(SHORT_TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL * 1500);
+ assertEquals(zktokenStateServiceNode1.getTokenExpiration(tokenId),
zktokenStateServiceNode2.getTokenExpiration(tokenId));
+ }
+
+ private ZookeeperTokenStateService setupZkTokenStateService(long
persistenceInterval) throws IOException, KeystoreServiceException,
ServiceLifecycleException {
// mocking GatewayConfig
final GatewayConfig gc = EasyMock.createNiceMock(GatewayConfig.class);
expect(gc.getRemoteRegistryConfigurationNames()).andReturn(Collections.singletonList(CONFIG_MONITOR_NAME)).anyTimes();
@@ -96,7 +145,7 @@ public class ZookeeperTokenStateServiceTest {
expect(gc.getRemoteConfigurationMonitorClientName()).andReturn(CONFIG_MONITOR_NAME).anyTimes();
expect(gc.getAlgorithm()).andReturn("AES").anyTimes();
expect(gc.isRemoteAliasServiceEnabled()).andReturn(true).anyTimes();
-
expect(gc.getKnoxTokenStateAliasPersistenceInterval()).andReturn(TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL).anyTimes();
+
expect(gc.getKnoxTokenStateAliasPersistenceInterval()).andReturn(persistenceInterval).anyTimes();
final Path baseFolder =
Paths.get(testFolder.newFolder().getAbsolutePath());
expect(gc.getGatewayDataDir()).andReturn(Paths.get(baseFolder.toString(),
"data").toString()).anyTimes();
expect(gc.getGatewayKeystoreDir()).andReturn(Paths.get(baseFolder.toString(),
"data", "keystores").toString()).anyTimes();
@@ -109,6 +158,7 @@ public class ZookeeperTokenStateServiceTest {
expect(masterService.getMasterSecret()).andReturn(masterSecret).anyTimes();
expect(gatewayServices.getService(ServiceType.MASTER_SERVICE)).andReturn(masterService).anyTimes();
final KeystoreService keystoreservice =
EasyMock.createNiceMock(KeystoreService.class);
+
expect(keystoreservice.getCredentialStoreForCluster(AliasService.NO_CLUSTER_NAME)).andReturn(null).anyTimes();
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreservice).anyTimes();
final AliasService aliasService =
EasyMock.createNiceMock(AliasService.class);
expect(gatewayServices.getService(ServiceType.ALIAS_SERVICE)).andReturn(aliasService).anyTimes();
@@ -116,22 +166,12 @@ public class ZookeeperTokenStateServiceTest {
clientService.setAliasService(aliasService);
clientService.init(gc, Collections.emptyMap());
expect(gatewayServices.getService(ServiceType.REMOTE_REGISTRY_CLIENT_SERVICE)).andReturn(clientService).anyTimes();
- replay(gatewayServices, masterService);
+ replay(gatewayServices, masterService, keystoreservice);
final ZookeeperTokenStateService zktokenStateService = new
ZookeeperTokenStateService(gatewayServices);
zktokenStateService.init(gc, new HashMap<>());
zktokenStateService.start();
-
- assertFalse(zkNodeExists("/knox/security/topology/__gateway/token1"));
- assertFalse(zkNodeExists("/knox/security/topology/__gateway/token1--max"));
-
- zktokenStateService.addToken("token1", 1L, 2L);
-
- // give some time for the token state service to persist the token aliases
in ZK (doubled the persistence interval)
- Thread.sleep(2 * TOKEN_STATE_ALIAS_PERSISTENCE_INTERVAL * 1000);
-
- assertTrue(zkNodeExists("/knox/security/topology/__gateway/token1"));
- assertTrue(zkNodeExists("/knox/security/topology/__gateway/token1--max"));
+ return zktokenStateService;
}
private boolean zkNodeExists(String nodeName) {
diff --git
a/gateway-util-common/src/main/java/org/apache/knox/gateway/util/ExecutorServiceUtils.java
b/gateway-util-common/src/main/java/org/apache/knox/gateway/util/ExecutorServiceUtils.java
new file mode 100644
index 0000000..7cf8546
--- /dev/null
+++
b/gateway-util-common/src/main/java/org/apache/knox/gateway/util/ExecutorServiceUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.knox.gateway.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ExecutorServiceUtils {
+
+ public static void shutdownAndAwaitTermination(ExecutorService
executorService, long timeout, TimeUnit timeUnit) {
+ executorService.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!executorService.awaitTermination(timeout, timeUnit)) {
+ executorService.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!executorService.awaitTermination(timeout, timeUnit)) {
+ System.err.println("Pool did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ executorService.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+}