This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new dba9e223b [KYUUBI #2301] Limit the maximum number of concurrent 
connections per user and ipaddress
dba9e223b is described below

commit dba9e223bd7554f1f1b189edb32c1f9c75367e85
Author: wforget <[email protected]>
AuthorDate: Tue Apr 19 12:08:08 2022 +0800

    [KYUUBI #2301] Limit the maximum number of concurrent connections per user 
and ipaddress
    
    ### _Why are the changes needed?_
    
    close #2301
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2364 from wForget/KYUUBI-2301.
    
    Closes #2301
    
    7a176e0e [wforget] merge
    0b41fafa [wforget] comment
    7614dbda [wforget] Merge remote-tracking branch 'origin/master' into 
KYUUBI-2301
    9180cf63 [wforget] fix
    741139a4 [wforget] fix
    1194ccbf [wforget] move limiter from common to server
    a6b93b38 [wforget] regenerate settings.md
    123f9209 [wforget] [KYUUBI-2301] Limit the maximum number of concurrent 
connections per user and ipaddress
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 docs/deployment/settings.md                        |   3 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  24 +++++
 .../kyuubi/session/KyuubiSessionManager.scala      |  19 ++++
 .../org/apache/kyuubi/session/SessionLimiter.scala | 110 +++++++++++++++++++++
 .../kyuubi/session/SessionLimiterSuite.scala       |  99 +++++++++++++++++++
 5 files changed, 255 insertions(+)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 10a6cc1c2..edf8b833f 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -331,6 +331,9 @@ Key | Default | Meaning | Type | Since
 
 Key | Default | Meaning | Type | Since
 --- | --- | --- | --- | ---
+<code>kyuubi.server.limit.connections.per.ipaddress</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>Maximum kyuubi 
server connections per ipaddress. Any user exceeding this limit will not be 
allowed to connect.</div>|<div style='width: 30pt'>int</div>|<div style='width: 
20pt'>1.6.0</div>
+<code>kyuubi.server.limit.connections.per.user</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>Maximum kyuubi 
server connections per user. Any user exceeding this limit will not be allowed 
to connect.</div>|<div style='width: 30pt'>int</div>|<div style='width: 
20pt'>1.6.0</div>
+<code>kyuubi.server.limit.connections.per.user.ipaddress</code>|<div 
style='width: 65pt;word-wrap: break-word;white-space: 
normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Maximum kyuubi server connections per 
user:ipaddress combination. Any user-ipaddress exceeding this limit will not be 
allowed to connect.</div>|<div style='width: 30pt'>int</div>|<div style='width: 
20pt'>1.6.0</div>
 <code>kyuubi.server.name</code>|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>The name of Kyuubi 
Server.</div>|<div style='width: 30pt'>string</div>|<div style='width: 
20pt'>1.5.0</div>
 
 
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 4b36d1263..7cb48b0a9 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1411,4 +1411,28 @@ object KyuubiConf {
       .version("1.6.0")
       .stringConf
       .createOptional
+
+  val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
+    buildConf("kyuubi.server.limit.connections.per.user")
+      .doc("Maximum kyuubi server connections per user." +
+        " Any user exceeding this limit will not be allowed to connect.")
+      .version("1.6.0")
+      .intConf
+      .createOptional
+
+  val SERVER_LIMIT_CONNECTIONS_PER_IPADDRESS: OptionalConfigEntry[Int] =
+    buildConf("kyuubi.server.limit.connections.per.ipaddress")
+      .doc("Maximum kyuubi server connections per ipaddress." +
+        " Any user exceeding this limit will not be allowed to connect.")
+      .version("1.6.0")
+      .intConf
+      .createOptional
+
+  val SERVER_LIMIT_CONNECTIONS_PER_USER_IPADDRESS: OptionalConfigEntry[Int] =
+    buildConf("kyuubi.server.limit.connections.per.user.ipaddress")
+      .doc("Maximum kyuubi server connections per user:ipaddress combination." 
+
+        " Any user-ipaddress exceeding this limit will not be allowed to 
connect.")
+      .version("1.6.0")
+      .intConf
+      .createOptional
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 982c14ca3..404a2d4e0 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -43,10 +43,13 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   // this lazy is must be specified since the conf is null when the class 
initialization
   lazy val sessionConfAdvisor: SessionConfAdvisor = 
PluginLoader.loadSessionConfAdvisor(conf)
 
+  private var limiter: Option[SessionLimiter] = None
+
   override def initialize(conf: KyuubiConf): Unit = {
     addService(credentialsManager)
     val absPath = 
Utils.getAbsolutePathFromWork(conf.get(SERVER_OPERATION_LOG_DIR_ROOT))
     _operationLogRoot = Some(absPath.toAbsolutePath.toString)
+    initSessionLimiter(conf)
     super.initialize(conf)
   }
 
@@ -75,6 +78,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       ipAddress: String,
       conf: Map[String, String]): SessionHandle = {
     val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
+    limiter.foreach(_.increment(UserIpAddress(username, ipAddress)))
     try {
       super.openSession(protocol, username, password, ipAddress, conf)
     } catch {
@@ -89,6 +93,12 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
     }
   }
 
+  override def closeSession(sessionHandle: SessionHandle): Unit = {
+    val session = getSession(sessionHandle)
+    super.closeSession(sessionHandle)
+    limiter.foreach(_.decrement(UserIpAddress(session.user, 
session.ipAddress)))
+  }
+
   def openBatchSession(
       protocol: TProtocolVersion,
       user: String,
@@ -146,6 +156,15 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   }
 
   override protected def isServer: Boolean = true
+
+  private def initSessionLimiter(conf: KyuubiConf): Unit = {
+    val userLimit = conf.get(SERVER_LIMIT_CONNECTIONS_PER_USER).getOrElse(0)
+    val ipAddressLimit = 
conf.get(SERVER_LIMIT_CONNECTIONS_PER_IPADDRESS).getOrElse(0)
+    val userIpAddressLimit = 
conf.get(SERVER_LIMIT_CONNECTIONS_PER_USER_IPADDRESS).getOrElse(0)
+    if (userLimit > 0 || ipAddressLimit > 0 || userIpAddressLimit > 0) {
+      limiter = Some(SessionLimiter(userLimit, ipAddressLimit, 
userIpAddressLimit))
+    }
+  }
 }
 
 object KyuubiSessionManager {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
new file mode 100644
index 000000000..d104b23c8
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.session
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.kyuubi.KyuubiSQLException
+
+trait SessionLimiter {
+  def increment(userIpAddress: UserIpAddress): Unit
+  def decrement(userIpAddress: UserIpAddress): Unit
+}
+
+case class UserIpAddress(user: String, ipAddress: String)
+
+class SessionLimiterImpl(userLimit: Int, ipAddressLimit: Int, 
userIpAddressLimit: Int)
+  extends SessionLimiter {
+
+  private val _counters: java.util.Map[String, AtomicInteger] =
+    new ConcurrentHashMap[String, AtomicInteger]()
+
+  private[session] def counters(): java.util.Map[String, AtomicInteger] = 
_counters
+
+  override def increment(userIpAddress: UserIpAddress): Unit = {
+    val user = userIpAddress.user
+    val ipAddress = userIpAddress.ipAddress
+    // increment userIpAddress count
+    if (userIpAddressLimit > 0 && StringUtils.isNotBlank(user) &&
+      StringUtils.isNotBlank(ipAddress)) {
+      incrLimitCount(
+        s"$user:$ipAddress",
+        userIpAddressLimit,
+        "Connection limit per user:ipaddress reached" +
+          s" (user:ipaddress: $user:$ipAddress limit: $userIpAddressLimit)")
+    }
+    // increment user count
+    if (userLimit > 0 && StringUtils.isNotBlank(user)) {
+      incrLimitCount(
+        user,
+        userLimit,
+        s"Connection limit per user reached (user: $user limit: $userLimit)")
+    }
+    // increment ipAddress count
+    if (ipAddressLimit > 0 && StringUtils.isNotBlank(ipAddress)) {
+      incrLimitCount(
+        ipAddress,
+        ipAddressLimit,
+        s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: 
$ipAddressLimit)")
+    }
+  }
+
+  override def decrement(userIpAddress: UserIpAddress): Unit = {
+    val user = userIpAddress.user
+    val ipAddress = userIpAddress.ipAddress
+    // decrement user count
+    if (userLimit > 0 && StringUtils.isNotBlank(user)) {
+      decrLimitCount(user)
+    }
+    // decrement ipAddress count
+    if (ipAddressLimit > 0 && StringUtils.isNotBlank(ipAddress)) {
+      decrLimitCount(ipAddress)
+    }
+    // decrement userIpAddress count
+    if (userIpAddressLimit > 0 && StringUtils.isNotBlank(user) &&
+      StringUtils.isNotBlank(ipAddress)) {
+      decrLimitCount(s"$user:$ipAddress")
+    }
+  }
+
+  private def incrLimitCount(key: String, limit: Int, errorMsg: String): Unit 
= {
+    val count = _counters.computeIfAbsent(key, _ => new AtomicInteger())
+    if (count.incrementAndGet() > limit) {
+      count.decrementAndGet()
+      throw KyuubiSQLException(errorMsg)
+    }
+  }
+
+  private def decrLimitCount(key: String): Unit = {
+    _counters.get(key) match {
+      case count: AtomicInteger => count.decrementAndGet()
+      case _ =>
+    }
+  }
+}
+
+object SessionLimiter {
+
+  def apply(userLimit: Int, ipAddressLimit: Int, userIpAddressLimit: Int): 
SessionLimiter = {
+    new SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit)
+  }
+
+}
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
new file mode 100644
index 000000000..d2df573e1
--- /dev/null
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.session
+
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.LongAdder
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
+
+class SessionLimiterSuite extends KyuubiFunSuite {
+
+  test("test increment session limit") {
+    val user = "user001"
+    val ipAddress = "127.0.0.1"
+    val userLimit = 30
+    val ipAddressLimit = 20
+    val userIpAddressLimit = 10
+    val threadPool = Executors.newFixedThreadPool(10)
+    def checkLimit(
+        userIpAddress: UserIpAddress,
+        expectedIndex: Int,
+        expectedErrorMsg: String): Unit = {
+      val limiter = SessionLimiter(userLimit, ipAddressLimit, 
userIpAddressLimit)
+      val successAdder = new LongAdder
+      val expectedErrorAdder = new LongAdder
+      val count = 50
+      val latch = new CountDownLatch(count)
+      for (i <- 0 until count) {
+        threadPool.execute(() => {
+          try {
+            limiter.increment(userIpAddress)
+            successAdder.increment()
+          } catch {
+            case e: KyuubiSQLException if e.getMessage === expectedErrorMsg =>
+              expectedErrorAdder.increment()
+            case _: Throwable =>
+          } finally {
+            latch.countDown()
+          }
+        })
+      }
+      latch.await()
+      assert(successAdder.intValue() == expectedIndex)
+      assert(expectedErrorAdder.intValue() == count - expectedIndex)
+    }
+
+    // user limit
+    checkLimit(
+      UserIpAddress(user, null),
+      userLimit,
+      s"Connection limit per user reached (user: $user limit: $userLimit)")
+
+    // ipAddress limit
+    checkLimit(
+      UserIpAddress(null, ipAddress),
+      ipAddressLimit,
+      s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: 
$ipAddressLimit)")
+
+    // userIpAddress limit
+    checkLimit(
+      UserIpAddress(user, ipAddress),
+      userIpAddressLimit,
+      s"Connection limit per user:ipaddress reached" +
+        s" (user:ipaddress: $user:$ipAddress limit: $userIpAddressLimit)")
+    threadPool.shutdown()
+  }
+
+  test("test increment and decrement session") {
+    val user = "user001"
+    val ipAddress = "127.0.0.1"
+    val userLimit = 30
+    val ipAddressLimit = 20
+    val userIpAddressLimit = 10
+    val limiter = SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit)
+    for (i <- 0 until 50) {
+      val userIpAddress = UserIpAddress(user, ipAddress)
+      limiter.increment(userIpAddress)
+      limiter.decrement(userIpAddress)
+    }
+    limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values
+      .foreach(c => assert(c.get() == 0))
+  }
+}

Reply via email to