This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.15 by this push:
     new 75c5866  GEODE-10097: Avoid Thread.sleep for reauthentication in 
MessageDispatcher (#7416)
75c5866 is described below

commit 75c586677c946c47241c06771b53ed18313f9ffc
Author: Jinmei Liao <[email protected]>
AuthorDate: Tue Mar 15 13:32:50 2022 -0700

    GEODE-10097: Avoid Thread.sleep for reauthentication in MessageDispatcher 
(#7416)
    
    
    (cherry picked from commit 2554f42b925f2b9b8ca7eee14c7a887436b1d9db)
---
 .../geode/security/AuthExpirationDUnitTest.java    |  9 +++-
 .../cache/tier/sockets/CacheClientProxy.java       |  7 +++
 .../cache/tier/sockets/MessageDispatcher.java      | 57 ++++++++++++++--------
 .../cache/tier/sockets/ServerConnection.java       |  1 +
 .../cache/tier/sockets/MessageDispatcherTest.java  |  8 +--
 .../geode/security/ExpirableSecurityManager.java   |  8 ++-
 .../sanctioned-geode-junit-serializables.txt       |  2 +-
 7 files changed, 64 insertions(+), 28 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
index bf38db0..b4f8f9c 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -66,6 +67,12 @@ public class AuthExpirationDUnitTest {
 
   private ClientVM clientVM;
 
+  @Before
+  public void before() throws Exception {
+    // this is enabled to show how many times authorize call is made with each 
permission key
+    getSecurityManager().setAllowDuplicate(true);
+  }
+
   @After
   public void after() {
     if (clientVM != null) {
@@ -110,7 +117,7 @@ public class AuthExpirationDUnitTest {
     Map<String, List<String>> authorizedOps = 
getSecurityManager().getAuthorizedOps();
     assertThat(authorizedOps.keySet().size()).isEqualTo(2);
     
assertThat(authorizedOps.get("user1")).asList().containsExactly("DATA:READ:region",
-        "DATA:READ:region:1");
+        "DATA:READ:region", "DATA:READ:region:1");
     
assertThat(authorizedOps.get("user2")).asList().containsExactly("DATA:READ:region:2");
 
     Map<String, List<String>> unAuthorizedOps = 
getSecurityManager().getUnAuthorizedOps();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 43be442..89c14a8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -723,6 +723,13 @@ public class CacheClientProxy implements ClientSession {
     return _messageDispatcher.isWaitingForReAuthentication();
   }
 
+  public void notifyReAuthentication() {
+    if (_messageDispatcher == null) {
+      return;
+    }
+    _messageDispatcher.notifyReAuthentication();
+  }
+
   /**
    * Returns whether the proxy is paused. It is paused if its message 
dispatcher is paused. This
    * only applies to durable clients.
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index 19920cf..e464673 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -113,6 +113,7 @@ public class MessageDispatcher extends LoggingThread {
   private volatile boolean _isStopped = true;
 
   private volatile long waitForReAuthenticationStartTime = -1;
+  private final Object reAuthenticationLock = new Object();
 
   /**
    * A lock object used to control pausing this dispatcher
@@ -193,6 +194,15 @@ public class MessageDispatcher extends LoggingThread {
     return waitForReAuthenticationStartTime > 0;
   }
 
+  private volatile boolean subjectUpdated = false;
+
+  public void notifyReAuthentication() {
+    synchronized (reAuthenticationLock) {
+      subjectUpdated = true;
+      reAuthenticationLock.notifyAll();
+    }
+  }
+
   private CacheClientProxy getProxy() {
     return _proxy;
   }
@@ -432,34 +442,39 @@ public class MessageDispatcher extends LoggingThread {
           _messageQueue.remove();
           clientMessage = null;
         } catch (AuthenticationExpiredException expired) {
-          if (waitForReAuthenticationStartTime == -1) {
+          // only send the message to clients who can handle the message
+          if 
(getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) 
{
+            EventID eventId = createEventId();
+            sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
+          }
+
+          // We wait for all versions of clients to re-authenticate. For older 
clients we still
+          // wait, just in case client will perform some operations to
+          // trigger credential refresh on its own.
+          synchronized (reAuthenticationLock) {
             waitForReAuthenticationStartTime = System.currentTimeMillis();
-            // only send the message to clients who can handle the message
-            if 
(getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) 
{
-              EventID eventId = createEventId();
-              sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
+            long waitFinishTime = waitForReAuthenticationStartTime + 
reAuthenticateWaitTime;
+            subjectUpdated = false;
+            long remainingWaitTime = waitFinishTime - 
System.currentTimeMillis();
+            while (!subjectUpdated && remainingWaitTime > 0) {
+              reAuthenticationLock.wait(remainingWaitTime);
+              remainingWaitTime = waitFinishTime - System.currentTimeMillis();
             }
-            // We wait for all versions of clients to re-authenticate. For 
older clients we still
-            // wait, just in case client will perform some operations to
-            // trigger credential refresh on its own.
-            Thread.sleep(200);
-          } else {
+          }
+          // the above wait timed out
+          if (!subjectUpdated) {
             long elapsedTime = System.currentTimeMillis() - 
waitForReAuthenticationStartTime;
-            if (elapsedTime > reAuthenticateWaitTime) {
-              // reset the timer here since we are no longer waiting for 
re-auth to happen anymore
-              waitForReAuthenticationStartTime = -1;
-              synchronized (_stopDispatchingLock) {
-                logger.warn("Client did not re-authenticate back successfully 
in " + elapsedTime
-                    + "ms. Unregister this client proxy.");
-                pauseOrUnregisterProxy(expired);
-              }
+            // reset the timer here since we are no longer waiting for re-auth 
to happen anymore
+            waitForReAuthenticationStartTime = -1;
+            synchronized (_stopDispatchingLock) {
+              logger.warn(
+                  "Client did not re-authenticate back successfully in {} ms. 
Unregister this client proxy.",
+                  elapsedTime);
+              pauseOrUnregisterProxy(expired);
               exceptionOccurred = true;
-            } else {
-              Thread.sleep(200);
             }
           }
         }
-
       } catch (MessageTooLargeException e) {
         logger.warn("Message too large to send to client: {}, {}", 
clientMessage, e.getMessage());
       } catch (IOException e) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 689325d..7bd8508 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1231,6 +1231,7 @@ public class ServerConnection implements Runnable {
       secureLogger.debug("update subject on client proxy {} with uniqueId {}", 
clientProxy,
           uniqueId);
       clientProxy.setSubject(subject);
+      clientProxy.notifyReAuthentication();
     }
     return uniqueId;
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
index 555e3f3..84791b8 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
@@ -137,7 +137,7 @@ public class MessageDispatcherTest {
 
   @Test
   public void newClientWillGetClientReAuthenticateMessage() throws Exception {
-    doReturn(false, false, false, false, false, 
true).when(dispatcher).isStopped();
+    doReturn(false, false, false, true).when(dispatcher).isStopped();
     
doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any());
     when(messageQueue.peek()).thenReturn(message);
     when(proxy.getVersion()).thenReturn(KnownVersion.GEODE_1_15_0);
@@ -145,7 +145,7 @@ public class MessageDispatcherTest {
     doNothing().when(dispatcher).sendMessageDirectly(any());
 
     // make sure wait time is short
-    
doReturn(-1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), 
anyLong());
+    
doReturn(1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), 
anyLong());
     dispatcher.runDispatcher();
 
     // verify a ReAuthenticate message will be send to the user
@@ -159,9 +159,9 @@ public class MessageDispatcherTest {
 
   @Test
   public void oldClientWillNotGetClientReAuthenticateMessage() throws 
Exception {
-    doReturn(false, false, false, false, false, 
true).when(dispatcher).isStopped();
+    doReturn(false, false, false, true).when(dispatcher).isStopped();
     // make sure wait time is short
-    
doReturn(-1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), 
anyLong());
+    
doReturn(1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), 
anyLong());
 
     
doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any());
     when(messageQueue.peek()).thenReturn(message);
diff --git 
a/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
 
b/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
index 06b2845..56c5157 100644
--- 
a/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
+++ 
b/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
@@ -39,6 +39,7 @@ public class ExpirableSecurityManager extends 
SimpleSecurityManager implements S
       new ConcurrentHashMap<>();
   private final Map<String, List<String>> unauthorizedOps =
       new ConcurrentHashMap<>();
+  private boolean allowDuplicate = false;
 
   @Override
   public Object authenticate(final Properties credentials) throws 
AuthenticationFailedException {
@@ -65,6 +66,10 @@ public class ExpirableSecurityManager extends 
SimpleSecurityManager implements S
     expired_users.add(user);
   }
 
+  public void setAllowDuplicate(boolean allowDuplicate) {
+    this.allowDuplicate = allowDuplicate;
+  }
+
   public Set<String> getExpiredUsers() {
     return expired_users;
   }
@@ -83,7 +88,7 @@ public class ExpirableSecurityManager extends 
SimpleSecurityManager implements S
     if (list == null) {
       list = new ArrayList<>();
     }
-    if (!list.contains(permission.toString())) {
+    if (allowDuplicate || !list.contains(permission.toString())) {
       list.add(permission.toString());
     }
     maps.put(user.toString(), list);
@@ -93,6 +98,7 @@ public class ExpirableSecurityManager extends 
SimpleSecurityManager implements S
     expired_users.clear();
     authorizedOps.clear();
     unauthorizedOps.clear();
+    allowDuplicate = false;
   }
 
 }
diff --git 
a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
 
b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
index 6249fc6..97e1d24 100644
--- 
a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
+++ 
b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
@@ -69,7 +69,7 @@ org/apache/geode/pdx/Day,false
 org/apache/geode/pdx/DomainObjectPdxAuto$Day,false
 org/apache/geode/pdx/DomainObjectPdxAutoNoDefaultConstructor$Day,false
 org/apache/geode/pdx/SimpleClass$SimpleEnum,false
-org/apache/geode/security/ExpirableSecurityManager,false,authorizedOps:java/util/Map,expired_users:java/util/Set,unauthorizedOps:java/util/Map
+org/apache/geode/security/ExpirableSecurityManager,false,allowDuplicate:boolean,authorizedOps:java/util/Map,expired_users:java/util/Set,unauthorizedOps:java/util/Map
 
org/apache/geode/security/query/data/PdxQueryTestObject,false,age:int,id:int,name:java/lang/String,shouldThrowException:boolean
 
org/apache/geode/security/query/data/PdxTrade,false,cusip:java/lang/String,id:java/lang/String,price:int,shares:int
 
org/apache/geode/security/query/data/QueryTestObject,false,dateField:java/util/Date,id:int,mapField:java/util/Map,name:java/lang/String

Reply via email to