This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.15 by this push:
new 75c5866 GEODE-10097: Avoid Thread.sleep for reauthentication in
MessageDispatcher (#7416)
75c5866 is described below
commit 75c586677c946c47241c06771b53ed18313f9ffc
Author: Jinmei Liao <[email protected]>
AuthorDate: Tue Mar 15 13:32:50 2022 -0700
GEODE-10097: Avoid Thread.sleep for reauthentication in MessageDispatcher
(#7416)
(cherry picked from commit 2554f42b925f2b9b8ca7eee14c7a887436b1d9db)
---
.../geode/security/AuthExpirationDUnitTest.java | 9 +++-
.../cache/tier/sockets/CacheClientProxy.java | 7 +++
.../cache/tier/sockets/MessageDispatcher.java | 57 ++++++++++++++--------
.../cache/tier/sockets/ServerConnection.java | 1 +
.../cache/tier/sockets/MessageDispatcherTest.java | 8 +--
.../geode/security/ExpirableSecurityManager.java | 8 ++-
.../sanctioned-geode-junit-serializables.txt | 2 +-
7 files changed, 64 insertions(+), 28 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
index bf38db0..b4f8f9c 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.After;
+import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -66,6 +67,12 @@ public class AuthExpirationDUnitTest {
private ClientVM clientVM;
+ @Before
+ public void before() throws Exception {
+ // this is enabled to show how many times authorize call is made with each
permission key
+ getSecurityManager().setAllowDuplicate(true);
+ }
+
@After
public void after() {
if (clientVM != null) {
@@ -110,7 +117,7 @@ public class AuthExpirationDUnitTest {
Map<String, List<String>> authorizedOps =
getSecurityManager().getAuthorizedOps();
assertThat(authorizedOps.keySet().size()).isEqualTo(2);
assertThat(authorizedOps.get("user1")).asList().containsExactly("DATA:READ:region",
- "DATA:READ:region:1");
+ "DATA:READ:region", "DATA:READ:region:1");
assertThat(authorizedOps.get("user2")).asList().containsExactly("DATA:READ:region:2");
Map<String, List<String>> unAuthorizedOps =
getSecurityManager().getUnAuthorizedOps();
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 43be442..89c14a8 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -723,6 +723,13 @@ public class CacheClientProxy implements ClientSession {
return _messageDispatcher.isWaitingForReAuthentication();
}
+ public void notifyReAuthentication() {
+ if (_messageDispatcher == null) {
+ return;
+ }
+ _messageDispatcher.notifyReAuthentication();
+ }
+
/**
* Returns whether the proxy is paused. It is paused if its message
dispatcher is paused. This
* only applies to durable clients.
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index 19920cf..e464673 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -113,6 +113,7 @@ public class MessageDispatcher extends LoggingThread {
private volatile boolean _isStopped = true;
private volatile long waitForReAuthenticationStartTime = -1;
+ private final Object reAuthenticationLock = new Object();
/**
* A lock object used to control pausing this dispatcher
@@ -193,6 +194,15 @@ public class MessageDispatcher extends LoggingThread {
return waitForReAuthenticationStartTime > 0;
}
+ private volatile boolean subjectUpdated = false;
+
+ public void notifyReAuthentication() {
+ synchronized (reAuthenticationLock) {
+ subjectUpdated = true;
+ reAuthenticationLock.notifyAll();
+ }
+ }
+
private CacheClientProxy getProxy() {
return _proxy;
}
@@ -432,34 +442,39 @@ public class MessageDispatcher extends LoggingThread {
_messageQueue.remove();
clientMessage = null;
} catch (AuthenticationExpiredException expired) {
- if (waitForReAuthenticationStartTime == -1) {
+ // only send the message to clients who can handle the message
+ if
(getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION))
{
+ EventID eventId = createEventId();
+ sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
+ }
+
+ // We wait for all versions of clients to re-authenticate. For older
clients we still
+ // wait, just in case client will perform some operations to
+ // trigger credential refresh on its own.
+ synchronized (reAuthenticationLock) {
waitForReAuthenticationStartTime = System.currentTimeMillis();
- // only send the message to clients who can handle the message
- if
(getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION))
{
- EventID eventId = createEventId();
- sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
+ long waitFinishTime = waitForReAuthenticationStartTime +
reAuthenticateWaitTime;
+ subjectUpdated = false;
+ long remainingWaitTime = waitFinishTime -
System.currentTimeMillis();
+ while (!subjectUpdated && remainingWaitTime > 0) {
+ reAuthenticationLock.wait(remainingWaitTime);
+ remainingWaitTime = waitFinishTime - System.currentTimeMillis();
}
- // We wait for all versions of clients to re-authenticate. For
older clients we still
- // wait, just in case client will perform some operations to
- // trigger credential refresh on its own.
- Thread.sleep(200);
- } else {
+ }
+ // the above wait timed out
+ if (!subjectUpdated) {
long elapsedTime = System.currentTimeMillis() -
waitForReAuthenticationStartTime;
- if (elapsedTime > reAuthenticateWaitTime) {
- // reset the timer here since we are no longer waiting for
re-auth to happen anymore
- waitForReAuthenticationStartTime = -1;
- synchronized (_stopDispatchingLock) {
- logger.warn("Client did not re-authenticate back successfully
in " + elapsedTime
- + "ms. Unregister this client proxy.");
- pauseOrUnregisterProxy(expired);
- }
+ // reset the timer here since we are no longer waiting for re-auth
to happen anymore
+ waitForReAuthenticationStartTime = -1;
+ synchronized (_stopDispatchingLock) {
+ logger.warn(
+ "Client did not re-authenticate back successfully in {} ms.
Unregister this client proxy.",
+ elapsedTime);
+ pauseOrUnregisterProxy(expired);
exceptionOccurred = true;
- } else {
- Thread.sleep(200);
}
}
}
-
} catch (MessageTooLargeException e) {
logger.warn("Message too large to send to client: {}, {}",
clientMessage, e.getMessage());
} catch (IOException e) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 689325d..7bd8508 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1231,6 +1231,7 @@ public class ServerConnection implements Runnable {
secureLogger.debug("update subject on client proxy {} with uniqueId {}",
clientProxy,
uniqueId);
clientProxy.setSubject(subject);
+ clientProxy.notifyReAuthentication();
}
return uniqueId;
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
index 555e3f3..84791b8 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
@@ -137,7 +137,7 @@ public class MessageDispatcherTest {
@Test
public void newClientWillGetClientReAuthenticateMessage() throws Exception {
- doReturn(false, false, false, false, false,
true).when(dispatcher).isStopped();
+ doReturn(false, false, false, true).when(dispatcher).isStopped();
doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any());
when(messageQueue.peek()).thenReturn(message);
when(proxy.getVersion()).thenReturn(KnownVersion.GEODE_1_15_0);
@@ -145,7 +145,7 @@ public class MessageDispatcherTest {
doNothing().when(dispatcher).sendMessageDirectly(any());
// make sure wait time is short
-
doReturn(-1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME),
anyLong());
+
doReturn(1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME),
anyLong());
dispatcher.runDispatcher();
// verify a ReAuthenticate message will be send to the user
@@ -159,9 +159,9 @@ public class MessageDispatcherTest {
@Test
public void oldClientWillNotGetClientReAuthenticateMessage() throws
Exception {
- doReturn(false, false, false, false, false,
true).when(dispatcher).isStopped();
+ doReturn(false, false, false, true).when(dispatcher).isStopped();
// make sure wait time is short
-
doReturn(-1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME),
anyLong());
+
doReturn(1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME),
anyLong());
doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any());
when(messageQueue.peek()).thenReturn(message);
diff --git
a/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
b/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
index 06b2845..56c5157 100644
---
a/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
+++
b/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
@@ -39,6 +39,7 @@ public class ExpirableSecurityManager extends
SimpleSecurityManager implements S
new ConcurrentHashMap<>();
private final Map<String, List<String>> unauthorizedOps =
new ConcurrentHashMap<>();
+ private boolean allowDuplicate = false;
@Override
public Object authenticate(final Properties credentials) throws
AuthenticationFailedException {
@@ -65,6 +66,10 @@ public class ExpirableSecurityManager extends
SimpleSecurityManager implements S
expired_users.add(user);
}
+ public void setAllowDuplicate(boolean allowDuplicate) {
+ this.allowDuplicate = allowDuplicate;
+ }
+
public Set<String> getExpiredUsers() {
return expired_users;
}
@@ -83,7 +88,7 @@ public class ExpirableSecurityManager extends
SimpleSecurityManager implements S
if (list == null) {
list = new ArrayList<>();
}
- if (!list.contains(permission.toString())) {
+ if (allowDuplicate || !list.contains(permission.toString())) {
list.add(permission.toString());
}
maps.put(user.toString(), list);
@@ -93,6 +98,7 @@ public class ExpirableSecurityManager extends
SimpleSecurityManager implements S
expired_users.clear();
authorizedOps.clear();
unauthorizedOps.clear();
+ allowDuplicate = false;
}
}
diff --git
a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
index 6249fc6..97e1d24 100644
---
a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
+++
b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
@@ -69,7 +69,7 @@ org/apache/geode/pdx/Day,false
org/apache/geode/pdx/DomainObjectPdxAuto$Day,false
org/apache/geode/pdx/DomainObjectPdxAutoNoDefaultConstructor$Day,false
org/apache/geode/pdx/SimpleClass$SimpleEnum,false
-org/apache/geode/security/ExpirableSecurityManager,false,authorizedOps:java/util/Map,expired_users:java/util/Set,unauthorizedOps:java/util/Map
+org/apache/geode/security/ExpirableSecurityManager,false,allowDuplicate:boolean,authorizedOps:java/util/Map,expired_users:java/util/Set,unauthorizedOps:java/util/Map
org/apache/geode/security/query/data/PdxQueryTestObject,false,age:int,id:int,name:java/lang/String,shouldThrowException:boolean
org/apache/geode/security/query/data/PdxTrade,false,cusip:java/lang/String,id:java/lang/String,price:int,shares:int
org/apache/geode/security/query/data/QueryTestObject,false,dateField:java/util/Date,id:int,mapField:java/util/Map,name:java/lang/String