This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4fbc35c29e GEODE-10097: Avoid Thread.sleep for re-auth in 
MessageDispatcher (#7556)
4fbc35c29e is described below

commit 4fbc35c29ef131f8d8f1f82391d224f3c2bbc346
Author: Jinmei Liao <jil...@pivotal.io>
AuthorDate: Mon Apr 11 08:48:34 2022 -0700

    GEODE-10097: Avoid Thread.sleep for re-auth in MessageDispatcher (#7556)
---
 .../security/AuthExpirationDistributedTest.java    |  9 ++-
 .../cache/tier/sockets/CacheClientProxy.java       |  7 ++
 .../cache/tier/sockets/MessageDispatcher.java      | 83 ++++++++++++++--------
 .../cache/tier/sockets/ServerConnection.java       |  1 +
 .../cache/tier/sockets/MessageDispatcherTest.java  | 41 +++++++++--
 .../geode/security/ExpirableSecurityManager.java   |  8 ++-
 .../sanctioned-geode-junit-serializables.txt       |  2 +-
 7 files changed, 111 insertions(+), 40 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java
index 8a3a1de11b..d1bb73e2c9 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.contrib.java.lang.system.RestoreSystemProperties;
@@ -65,6 +66,12 @@ public class AuthExpirationDistributedTest {
 
   private ClientVM clientVM;
 
+  @Before
+  public void before() throws Exception {
+    // this is enabled to show how many times authorize call is made with each 
permission key
+    getSecurityManager().setAllowDuplicate(true);
+  }
+
   @After
   public void after() {
     if (clientVM != null) {
@@ -109,7 +116,7 @@ public class AuthExpirationDistributedTest {
     Map<String, List<String>> authorizedOps = 
getSecurityManager().getAuthorizedOps();
     assertThat(authorizedOps.keySet().size()).isEqualTo(2);
     
assertThat(authorizedOps.get("user1")).asList().containsExactly("DATA:READ:region",
-        "DATA:READ:region:1");
+        "DATA:READ:region", "DATA:READ:region:1");
     
assertThat(authorizedOps.get("user2")).asList().containsExactly("DATA:READ:region:2");
 
     Map<String, List<String>> unAuthorizedOps = 
getSecurityManager().getUnAuthorizedOps();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 43be4421e7..89c14a8be3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -723,6 +723,13 @@ public class CacheClientProxy implements ClientSession {
     return _messageDispatcher.isWaitingForReAuthentication();
   }
 
+  public void notifyReAuthentication() {
+    if (_messageDispatcher == null) {
+      return;
+    }
+    _messageDispatcher.notifyReAuthentication();
+  }
+
   /**
    * Returns whether the proxy is paused. It is paused if its message 
dispatcher is paused. This
    * only applies to durable clients.
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index 4c88ee20d8..a03ab4bf81 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -113,6 +113,7 @@ public class MessageDispatcher extends LoggingThread {
   private volatile boolean _isStopped = true;
 
   private volatile long waitForReAuthenticationStartTime = -1;
+  private final Object reAuthenticationLock = new Object();
 
   /**
    * A lock object used to control pausing this dispatcher
@@ -193,6 +194,15 @@ public class MessageDispatcher extends LoggingThread {
     return waitForReAuthenticationStartTime > 0;
   }
 
+  private boolean subjectUpdated = false;
+
+  public void notifyReAuthentication() {
+    synchronized (reAuthenticationLock) {
+      subjectUpdated = true;
+      reAuthenticationLock.notifyAll();
+    }
+  }
+
   private CacheClientProxy getProxy() {
     return _proxy;
   }
@@ -361,9 +371,6 @@ public class MessageDispatcher extends LoggingThread {
       logger.debug("{}: Beginning to process events", this);
     }
 
-    long reAuthenticateWaitTime =
-        getSystemProperty(RE_AUTHENTICATE_WAIT_TIME, 
DEFAULT_RE_AUTHENTICATE_WAIT_TIME);
-
     ClientMessage clientMessage = null;
 
     while (!isStopped()) {
@@ -432,34 +439,8 @@ public class MessageDispatcher extends LoggingThread {
           _messageQueue.remove();
           clientMessage = null;
         } catch (AuthenticationExpiredException expired) {
-          if (waitForReAuthenticationStartTime == -1) {
-            waitForReAuthenticationStartTime = System.currentTimeMillis();
-            // only send the message to clients who can handle the message
-            if 
(getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) 
{
-              EventID eventId = createEventId();
-              sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
-            }
-            // We wait for all versions of clients to re-authenticate. For 
older clients we still
-            // wait, just in case client will perform some operations to
-            // trigger credential refresh on its own.
-            Thread.sleep(200);
-          } else {
-            long elapsedTime = System.currentTimeMillis() - 
waitForReAuthenticationStartTime;
-            if (elapsedTime > reAuthenticateWaitTime) {
-              // reset the timer here since we are no longer waiting for 
re-auth to happen anymore
-              waitForReAuthenticationStartTime = -1;
-              synchronized (_stopDispatchingLock) {
-                logger.warn("Client did not re-authenticate back successfully 
in " + elapsedTime
-                    + "ms. Unregister this client proxy.");
-                pauseOrUnregisterProxy(expired);
-              }
-              exceptionOccurred = true;
-            } else {
-              Thread.sleep(200);
-            }
-          }
+          exceptionOccurred = handleAuthenticationExpiredException(expired);
         }
-
       } catch (MessageTooLargeException e) {
         logger.warn("Message too large to send to client: {}, {}", 
clientMessage, e.getMessage());
       } catch (IOException e) {
@@ -548,6 +529,48 @@ public class MessageDispatcher extends LoggingThread {
     }
   }
 
+  private boolean 
handleAuthenticationExpiredException(AuthenticationExpiredException expired)
+      throws InterruptedException {
+    long reAuthenticateWaitTime =
+        getSystemProperty(RE_AUTHENTICATE_WAIT_TIME, 
DEFAULT_RE_AUTHENTICATE_WAIT_TIME);
+    synchronized (reAuthenticationLock) {
+      // turn on the "isWaitingForReAuthentication" flag before we send the 
re-auth message
+      // if we do it the other way around, the re-auth might be finished 
before we turn on the
+      // flag for the notification to happen.
+      waitForReAuthenticationStartTime = System.currentTimeMillis();
+      subjectUpdated = false;
+      // only send the message to clients who can handle the message
+      if 
(getProxy().getVersion().isNewerThanOrEqualTo(RE_AUTHENTICATION_START_VERSION)) 
{
+        EventID eventId = createEventId();
+        sendMessageDirectly(new ClientReAuthenticateMessage(eventId));
+      }
+
+      // We wait for all versions of clients to re-authenticate. For older 
clients we still
+      // wait, just in case client will perform some operations to
+      // trigger credential refresh on its own.
+      long waitFinishTime = waitForReAuthenticationStartTime + 
reAuthenticateWaitTime;
+      long remainingWaitTime = waitFinishTime - System.currentTimeMillis();
+      while (!subjectUpdated && remainingWaitTime > 0) {
+        reAuthenticationLock.wait(remainingWaitTime);
+        remainingWaitTime = waitFinishTime - System.currentTimeMillis();
+      }
+    }
+    // the above wait timed out
+    if (!subjectUpdated) {
+      long elapsedTime = System.currentTimeMillis() - 
waitForReAuthenticationStartTime;
+      // reset the timer here since we are no longer waiting for re-auth to 
happen anymore
+      waitForReAuthenticationStartTime = -1;
+      synchronized (_stopDispatchingLock) {
+        logger.warn(
+            "Client did not re-authenticate back successfully in {} ms. 
Unregister this client proxy.",
+            elapsedTime);
+        pauseOrUnregisterProxy(expired);
+        return true;
+      }
+    }
+    return false;
+  }
+
   @VisibleForTesting
   void dispatchResidualMessages() {
     List<ClientMessage> list = new ArrayList<>();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index f39ad719a0..004ebd4730 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1231,6 +1231,7 @@ public class ServerConnection implements Runnable {
       secureLogger.debug("update subject on client proxy {} with uniqueId {}", 
clientProxy,
           uniqueId);
       clientProxy.setSubject(subject);
+      clientProxy.notifyReAuthentication();
     }
     return uniqueId;
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
index 555e3f3556..a01bc7bf1c 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcherTest.java
@@ -18,6 +18,7 @@
 package org.apache.geode.internal.cache.tier.sockets;
 
 import static 
org.apache.geode.internal.lang.SystemPropertyHelper.RE_AUTHENTICATE_WAIT_TIME;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -35,8 +36,8 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 
 import org.apache.shiro.subject.Subject;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.RegionDestroyedException;
@@ -65,7 +66,7 @@ public class MessageDispatcherTest {
   private CacheClientProxyStats proxyStats;
   private EventID eventID;
 
-  @Before
+  @BeforeEach
   public void before() throws Exception {
     proxy = mock(CacheClientProxy.class);
     message = mock(ClientUpdateMessageImpl.class);
@@ -137,7 +138,7 @@ public class MessageDispatcherTest {
 
   @Test
   public void newClientWillGetClientReAuthenticateMessage() throws Exception {
-    doReturn(false, false, false, false, false, 
true).when(dispatcher).isStopped();
+    doReturn(false, false, false, true).when(dispatcher).isStopped();
     
doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any());
     when(messageQueue.peek()).thenReturn(message);
     when(proxy.getVersion()).thenReturn(KnownVersion.GEODE_1_15_0);
@@ -145,7 +146,7 @@ public class MessageDispatcherTest {
     doNothing().when(dispatcher).sendMessageDirectly(any());
 
     // make sure wait time is short
-    
doReturn(-1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), 
anyLong());
+    
doReturn(1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), 
anyLong());
     dispatcher.runDispatcher();
 
     // verify a ReAuthenticate message will be send to the user
@@ -159,9 +160,9 @@ public class MessageDispatcherTest {
 
   @Test
   public void oldClientWillNotGetClientReAuthenticateMessage() throws 
Exception {
-    doReturn(false, false, false, false, false, 
true).when(dispatcher).isStopped();
+    doReturn(false, false, true).when(dispatcher).isStopped();
     // make sure wait time is short
-    
doReturn(-1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), 
anyLong());
+    
doReturn(1L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME), 
anyLong());
 
     
doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any());
     when(messageQueue.peek()).thenReturn(message);
@@ -173,6 +174,32 @@ public class MessageDispatcherTest {
     verify(dispatcher, never()).dispatchResidualMessages();
   }
 
+
+  @Test
+  public void oldClientWillContinueToDeliverMessageIfNotified() throws 
Exception {
+    doReturn(false, false, true).when(dispatcher).isStopped();
+    // make sure wait time is short
+    
doReturn(10000L).when(dispatcher).getSystemProperty(eq(RE_AUTHENTICATE_WAIT_TIME),
 anyLong());
+    
doThrow(AuthenticationExpiredException.class).when(dispatcher).dispatchMessage(any());
+    when(messageQueue.peek()).thenReturn(message);
+    when(proxy.getVersion()).thenReturn(KnownVersion.GEODE_1_14_0);
+
+    Thread dispatcherThread = new Thread(() -> dispatcher.runDispatcher());
+    Thread notifyThread = new Thread(() -> 
dispatcher.notifyReAuthentication());
+
+    dispatcherThread.start();
+    await().until(() -> dispatcher.isWaitingForReAuthentication());
+    notifyThread.start();
+
+    dispatcherThread.join();
+    notifyThread.join();
+
+    verify(dispatcher, never()).sendMessageDirectly(any());
+    // dispatcher will dispatch message
+    verify(dispatcher, 
never()).pauseOrUnregisterProxy(any(AuthenticationExpiredException.class));
+    verify(dispatcher).dispatchResidualMessages();
+  }
+
   @Test
   public void 
ioExceptionHappenedForDurableClientWillContinueToPeekForNextMessage()
       throws Exception {
diff --git 
a/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
 
b/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
index 06b2845145..56c5157270 100644
--- 
a/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
+++ 
b/geode-junit/src/main/java/org/apache/geode/security/ExpirableSecurityManager.java
@@ -39,6 +39,7 @@ public class ExpirableSecurityManager extends 
SimpleSecurityManager implements S
       new ConcurrentHashMap<>();
   private final Map<String, List<String>> unauthorizedOps =
       new ConcurrentHashMap<>();
+  private boolean allowDuplicate = false;
 
   @Override
   public Object authenticate(final Properties credentials) throws 
AuthenticationFailedException {
@@ -65,6 +66,10 @@ public class ExpirableSecurityManager extends 
SimpleSecurityManager implements S
     expired_users.add(user);
   }
 
+  public void setAllowDuplicate(boolean allowDuplicate) {
+    this.allowDuplicate = allowDuplicate;
+  }
+
   public Set<String> getExpiredUsers() {
     return expired_users;
   }
@@ -83,7 +88,7 @@ public class ExpirableSecurityManager extends 
SimpleSecurityManager implements S
     if (list == null) {
       list = new ArrayList<>();
     }
-    if (!list.contains(permission.toString())) {
+    if (allowDuplicate || !list.contains(permission.toString())) {
       list.add(permission.toString());
     }
     maps.put(user.toString(), list);
@@ -93,6 +98,7 @@ public class ExpirableSecurityManager extends 
SimpleSecurityManager implements S
     expired_users.clear();
     authorizedOps.clear();
     unauthorizedOps.clear();
+    allowDuplicate = false;
   }
 
 }
diff --git 
a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
 
b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
index 6249fc65a3..97e1d24230 100644
--- 
a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
+++ 
b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
@@ -69,7 +69,7 @@ org/apache/geode/pdx/Day,false
 org/apache/geode/pdx/DomainObjectPdxAuto$Day,false
 org/apache/geode/pdx/DomainObjectPdxAutoNoDefaultConstructor$Day,false
 org/apache/geode/pdx/SimpleClass$SimpleEnum,false
-org/apache/geode/security/ExpirableSecurityManager,false,authorizedOps:java/util/Map,expired_users:java/util/Set,unauthorizedOps:java/util/Map
+org/apache/geode/security/ExpirableSecurityManager,false,allowDuplicate:boolean,authorizedOps:java/util/Map,expired_users:java/util/Set,unauthorizedOps:java/util/Map
 
org/apache/geode/security/query/data/PdxQueryTestObject,false,age:int,id:int,name:java/lang/String,shouldThrowException:boolean
 
org/apache/geode/security/query/data/PdxTrade,false,cusip:java/lang/String,id:java/lang/String,price:int,shares:int
 
org/apache/geode/security/query/data/QueryTestObject,false,dateField:java/util/Date,id:int,mapField:java/util/Map,name:java/lang/String

Reply via email to