This is an automated email from the ASF dual-hosted git repository.

aradzinski pushed a commit to branch NLPCRAFT-477
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git


The following commit(s) were added to refs/heads/NLPCRAFT-477 by this push:
     new 9138305  Create NCConversationManager.scala
9138305 is described below

commit 91383058e3119f05554c76d7f0bc5909f37ec61d
Author: Aaron Radzinski <aradzin...@datalingvo.com>
AuthorDate: Wed Feb 2 09:31:23 2022 -0800

    Create NCConversationManager.scala
---
 .../conversation/NCConversationManager.scala       | 111 +++++++++++++++++++++
 1 file changed, 111 insertions(+)

diff --git 
a/nlpcraft/src/main/scala/org/apache/nlpcraft/internal/conversation/NCConversationManager.scala
 
b/nlpcraft/src/main/scala/org/apache/nlpcraft/internal/conversation/NCConversationManager.scala
new file mode 100644
index 0000000..d916e37
--- /dev/null
+++ 
b/nlpcraft/src/main/scala/org/apache/nlpcraft/internal/conversation/NCConversationManager.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nlpcraft.internal.conversation
+
+import io.opencensus.trace.Span
+import org.apache.nlpcraft.common._
+import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
+
+import scala.collection._
+
+/**
+  * Conversation manager.
+  */
+object NCConversationManager:
+    case class Key(usrId: Long, mdlId: String)
+    case class Value(conv: NCConversation, var tstamp: Long = 0)
+
+    private final val convs: mutable.Map[Key, Value] = 
mutable.HashMap.empty[Key, Value]
+
+    @volatile private var gc: Thread = _
+
+    /**
+      *
+      * @return
+      */
+    def start(): NCService =
+        gc = NCUtils.mkThread("conversation-manager-gc") { t =>
+            while (!t.isInterrupted)
+                try
+                    convs.synchronized {
+                        val sleepTime = clearForTimeout() - U.now()
+                        if sleepTime > 0 then convs.wait(sleepTime)
+                    }
+                catch
+                    case _: InterruptedException => // No-op.
+                    case e: Throwable => U.prettyError(logger, s"Unexpected 
error for thread: ${t.getName}", e)
+        }
+        gc.start()
+
+    /**
+      *
+      */
+    def stop(): Unit =
+        NCUtils.stopThread(gc)
+        gc = null
+        convs.clear()
+
+    /**
+      * Gets next clearing time.
+      */
+    private def clearForTimeout(): Long =
+        require(Thread.holdsLock(convs))
+
+        val now = U.now()
+        val delKeys = mutable.HashSet.empty[Key]
+
+        for ((key, value) <- convs)
+            val del =
+                NCModelManager.getModelOpt(key.mdlId) match
+                    case Some(mdl) => value.tstamp < now - 
mdl.model.getConversationTimeout
+                    case None => true
+
+            if del then
+                value.conv.getUserData.clear()
+                delKeys += key
+
+        convs --= delKeys
+
+        if convs.nonEmpty then convs.values.map(v => v.tstamp + 
v.conv.timeoutMs).min
+        else Long.MaxValue
+
+    /**
+      * Gets conversation for given key.
+      *
+      * @param usrId User ID.
+      * @param mdlId Model ID.
+      * @return New or existing conversation.
+      */
+    def getConversation(usrId: Long, mdlId: String, parent: Span = null): 
NCConversation =
+        startScopedSpan("getConversation", parent, "usrId" -> usrId, "mdlId" 
-> mdlId) { _ =>
+            val mdl = NCModelManager.getModel(mdlId).model
+
+            convs.synchronized {
+                val v = convs.getOrElseUpdate(
+                    Key(usrId, mdlId),
+                    Value(NCConversation(usrId, mdlId, 
mdl.getConversationTimeout, mdl.getConversationDepth))
+                )
+
+                v.tstamp = U.nowUtcMs()
+
+                convs.notifyAll()
+
+                v.conv
+            }
+        }
+}

Reply via email to