This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2371cc62f [KYUUBI #5216] Workaround for negative counter in 
SessionLimiter
2371cc62f is described below

commit 2371cc62f3757e643766865f95f5a6f2b46b7cf8
Author: Fantasy-Jay <[email protected]>
AuthorDate: Fri Sep 8 14:01:32 2023 +0800

    [KYUUBI #5216] Workaround for negative counter in SessionLimiter
    
    ### _Why are the changes needed?_
    
    Fix: https://github.com/apache/kyuubi/issues/5216
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No
    
    Closes #5217 from zhuyaogai/issue-5216.
    
    Closes #5216
    
    b8d2e1796 [Fantasy-Jay] Limit counter resource leak in SessionLimiter.
    cda3702e5 [Fantasy-Jay] Limit counter resource leak in SessionLimiter.
    36272d1b1 [Fantasy-Jay] fix test bug.
    1e282d20f [Fantasy-Jay] Limit counter resource leak in SessionLimiter.
    7fc389ff1 [Fantasy-Jay] Limit counter resource leak in SessionLimiter.
    
    Authored-by: Fantasy-Jay <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../org/apache/kyuubi/session/SessionLimiter.scala |  9 +---
 .../kyuubi/session/SessionLimiterSuite.scala       | 50 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 7 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
index c8112d532..8a1ebedf1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
@@ -95,7 +95,8 @@ class SessionLimiterImpl(userLimit: Int, ipAddressLimit: Int, 
userIpAddressLimit
 
   private def decrLimitCount(key: String): Unit = {
     _counters.get(key) match {
-      case count: AtomicInteger => count.decrementAndGet()
+      case count: AtomicInteger =>
+        count.accumulateAndGet(1, (l, r) => if (l > 0) l - r else l)
       case _ =>
     }
   }
@@ -121,12 +122,6 @@ class SessionLimiterWithAccessControlListImpl(
     }
   }
 
-  override def decrement(userIpAddress: UserIpAddress): Unit = {
-    if (!unlimitedUsers.contains(userIpAddress.user)) {
-      super.decrement(userIpAddress)
-    }
-  }
-
   private[kyuubi] def setUnlimitedUsers(unlimitedUsers: Set[String]): Unit = {
     this.unlimitedUsers = unlimitedUsers
   }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
index df75d15f2..775239f9b 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
@@ -20,8 +20,10 @@ import java.util.concurrent.{CountDownLatch, Executors}
 import java.util.concurrent.atomic.LongAdder
 
 import scala.collection.JavaConverters._
+import scala.util.Random
 
 import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
+import org.apache.kyuubi.util.ThreadUtils
 
 class SessionLimiterSuite extends KyuubiFunSuite {
 
@@ -149,4 +151,52 @@ class SessionLimiterSuite extends KyuubiFunSuite {
     assert(caught.getMessage.equals(
       "Connection denied because the user is in the deny user list. (user: 
user002)"))
   }
+
+  test("test refresh unlimited users and deny users") {
+    val random: Random = new Random()
+    val latch = new CountDownLatch(600)
+    val userLimit = 100
+    val ipAddressLimit = 101
+    val userIpAddressLimit = 102
+    val limiter =
+      SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit, Set.empty, 
Set.empty)
+    val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("test-refresh-config")
+
+    def checkUserLimit(userIpAddress: UserIpAddress): Unit = {
+      for (i <- 0 until 200) {
+        threadPool.execute(() => {
+          try {
+            Thread.sleep(random.nextInt(200))
+            limiter.increment(userIpAddress)
+          } catch {
+            case _: Throwable =>
+          } finally {
+            Thread.sleep(random.nextInt(500))
+            // finally call limiter#decrement method.
+            limiter.decrement(userIpAddress)
+            latch.countDown()
+          }
+        })
+      }
+    }
+
+    checkUserLimit(UserIpAddress("user001", "127.0.0.1"))
+    checkUserLimit(UserIpAddress("user002", "127.0.0.2"))
+    checkUserLimit(UserIpAddress("user003", "127.0.0.3"))
+
+    Thread.sleep(100)
+    // set unlimited users and deny users
+    SessionLimiter.resetUnlimitedUsers(limiter, Set("user001"))
+    SessionLimiter.resetDenyUsers(limiter, Set("user002"))
+
+    Thread.sleep(300)
+    // unset unlimited users and deny users
+    SessionLimiter.resetUnlimitedUsers(limiter, Set.empty)
+    SessionLimiter.resetDenyUsers(limiter, Set.empty)
+
+    latch.await()
+    threadPool.shutdown()
+    limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values
+      .foreach(c => assert(c.get() == 0))
+  }
 }

Reply via email to