This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 2371cc62f [KYUUBI #5216] Workaround for negative counter in
SessionLimiter
2371cc62f is described below
commit 2371cc62f3757e643766865f95f5a6f2b46b7cf8
Author: Fantasy-Jay <[email protected]>
AuthorDate: Fri Sep 8 14:01:32 2023 +0800
[KYUUBI #5216] Workaround for negative counter in SessionLimiter
### _Why are the changes needed?_
Fix: https://github.com/apache/kyuubi/issues/5216
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5217 from zhuyaogai/issue-5216.
Closes #5216
b8d2e1796 [Fantasy-Jay] Limit counter resource leak in SessionLimiter.
cda3702e5 [Fantasy-Jay] Limit counter resource leak in SessionLimiter.
36272d1b1 [Fantasy-Jay] fix test bug.
1e282d20f [Fantasy-Jay] Limit counter resource leak in SessionLimiter.
7fc389ff1 [Fantasy-Jay] Limit counter resource leak in SessionLimiter.
Authored-by: Fantasy-Jay <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../org/apache/kyuubi/session/SessionLimiter.scala | 9 +---
.../kyuubi/session/SessionLimiterSuite.scala | 50 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 7 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
index c8112d532..8a1ebedf1 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
@@ -95,7 +95,8 @@ class SessionLimiterImpl(userLimit: Int, ipAddressLimit: Int,
userIpAddressLimit
private def decrLimitCount(key: String): Unit = {
_counters.get(key) match {
- case count: AtomicInteger => count.decrementAndGet()
+ case count: AtomicInteger =>
+ count.accumulateAndGet(1, (l, r) => if (l > 0) l - r else l)
case _ =>
}
}
@@ -121,12 +122,6 @@ class SessionLimiterWithAccessControlListImpl(
}
}
- override def decrement(userIpAddress: UserIpAddress): Unit = {
- if (!unlimitedUsers.contains(userIpAddress.user)) {
- super.decrement(userIpAddress)
- }
- }
-
private[kyuubi] def setUnlimitedUsers(unlimitedUsers: Set[String]): Unit = {
this.unlimitedUsers = unlimitedUsers
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
index df75d15f2..775239f9b 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
@@ -20,8 +20,10 @@ import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.LongAdder
import scala.collection.JavaConverters._
+import scala.util.Random
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
+import org.apache.kyuubi.util.ThreadUtils
class SessionLimiterSuite extends KyuubiFunSuite {
@@ -149,4 +151,52 @@ class SessionLimiterSuite extends KyuubiFunSuite {
assert(caught.getMessage.equals(
"Connection denied because the user is in the deny user list. (user:
user002)"))
}
+
+ test("test refresh unlimited users and deny users") {
+ val random: Random = new Random()
+ val latch = new CountDownLatch(600)
+ val userLimit = 100
+ val ipAddressLimit = 101
+ val userIpAddressLimit = 102
+ val limiter =
+ SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit, Set.empty,
Set.empty)
+ val threadPool =
ThreadUtils.newDaemonCachedThreadPool("test-refresh-config")
+
+ def checkUserLimit(userIpAddress: UserIpAddress): Unit = {
+ for (i <- 0 until 200) {
+ threadPool.execute(() => {
+ try {
+ Thread.sleep(random.nextInt(200))
+ limiter.increment(userIpAddress)
+ } catch {
+ case _: Throwable =>
+ } finally {
+ Thread.sleep(random.nextInt(500))
+ // finally call limiter#decrement method.
+ limiter.decrement(userIpAddress)
+ latch.countDown()
+ }
+ })
+ }
+ }
+
+ checkUserLimit(UserIpAddress("user001", "127.0.0.1"))
+ checkUserLimit(UserIpAddress("user002", "127.0.0.2"))
+ checkUserLimit(UserIpAddress("user003", "127.0.0.3"))
+
+ Thread.sleep(100)
+ // set unlimited users and deny users
+ SessionLimiter.resetUnlimitedUsers(limiter, Set("user001"))
+ SessionLimiter.resetDenyUsers(limiter, Set("user002"))
+
+ Thread.sleep(300)
+ // unset unlimited users and deny users
+ SessionLimiter.resetUnlimitedUsers(limiter, Set.empty)
+ SessionLimiter.resetDenyUsers(limiter, Set.empty)
+
+ latch.await()
+ threadPool.shutdown()
+ limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values
+ .foreach(c => assert(c.get() == 0))
+ }
}