[ 
https://issues.apache.org/jira/browse/PIO-182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667271#comment-16667271
 ] 

ASF GitHub Bot commented on PIO-182:
------------------------------------

takezoe closed pull request #482: [PIO-182] Add async methods to LEventStore
URL: https://github.com/apache/predictionio/pull/482
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala 
b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
index 3a82e9856..a73ee80c1 100644
--- a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
@@ -20,15 +20,30 @@ package org.apache.predictionio.data.store
 
 import org.apache.predictionio.data.storage.Storage
 import org.apache.predictionio.data.storage.Event
-
 import org.joda.time.DateTime
 
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration.Duration
 
 /** This object provides a set of operation to access Event Store
-  * without going through Spark's parallelization
+  * without going through Spark's parallelization.
+  *
+  * Note that blocking methods of this object uses
+  * `scala.concurrent.ExecutionContext.Implicits.global` internally.
+  * Since this is a thread pool which has a number of threads equal to 
available
+  * processors, parallelism is limited up to the number of processors.
+  *
+  * If this limitation become bottleneck of resource usage, you can increase 
the
+  * number of threads by declaring following VM options before calling "pio 
deploy":
+  *
+  * <pre>
+  * export JAVA_OPTS="$JAVA_OPTS \
+  *   -Dscala.concurrent.context.numThreads=1000 \
+  *   -Dscala.concurrent.context.maxThreads=1000"
+  * </pre>
+  *
+  * You can learn more about the global execution context in the Scala 
documentation:
+  * 
[[https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context]]
   */
 object LEventStore {
 
@@ -72,9 +87,62 @@ object LEventStore {
     latest: Boolean = true,
     timeout: Duration = defaultTimeout): Iterator[Event] = {
 
+    // Import here to ensure ExecutionContext.Implicits.global is used only in 
this method
+    import scala.concurrent.ExecutionContext.Implicits.global
+
+    Await.result(findByEntityAsync(
+      appName = appName,
+      entityType = entityType,
+      entityId = entityId,
+      channelName = channelName,
+      eventNames = eventNames,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      startTime = startTime,
+      untilTime = untilTime,
+      limit = limit,
+      latest = latest),
+      timeout)
+  }
+
+  /** Reads events of the specified entity. May use this in Algorithm's 
predict()
+    * or Serving logic to have fast event store access.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param channelName return events of this channel (default channel if 
it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param latest Return latest event first (default true)
+    * @return Future[Iterator[Event]]
+    */
+  def findByEntityAsync(
+    appName: String,
+    entityType: String,
+    entityId: String,
+    channelName: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    limit: Option[Int] = None,
+    latest: Boolean = true)(implicit ec: ExecutionContext): 
Future[Iterator[Event]] = {
+
     val (appId, channelId) = Common.appNameToId(appName, channelName)
 
-    Await.result(eventsDb.futureFind(
+    eventsDb.futureFind(
       appId = appId,
       channelId = channelId,
       startTime = startTime,
@@ -85,8 +153,7 @@ object LEventStore {
       targetEntityType = targetEntityType,
       targetEntityId = targetEntityId,
       limit = limit,
-      reversed = Some(latest)),
-      timeout)
+      reversed = Some(latest))
   }
 
   /** Reads events generically. If entityType or entityId is not specified, it
@@ -127,9 +194,62 @@ object LEventStore {
     limit: Option[Int] = None,
     timeout: Duration = defaultTimeout): Iterator[Event] = {
 
+    // Import here to ensure ExecutionContext.Implicits.global is used only in 
this method
+    import scala.concurrent.ExecutionContext.Implicits.global
+
+    Await.result(findAsync(
+      appName = appName,
+      entityType = entityType,
+      entityId = entityId,
+      channelName = channelName,
+      eventNames = eventNames,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      startTime = startTime,
+      untilTime = untilTime,
+      limit = limit), timeout)
+  }
+
+  /** Reads events generically. If entityType or entityId is not specified, it
+    * results in table scan.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    *   - None means no restriction on entityType
+    *   - Some(x) means entityType should match x.
+    * @param entityId return events of this entityId
+    *   - None means no restriction on entityId
+    *   - Some(x) means entityId should match x.
+    * @param channelName return events of this channel (default channel if 
it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @return Future[Iterator[Event]]
+    */
+  def findAsync(
+    appName: String,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    channelName: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    limit: Option[Int] = None)(implicit ec: ExecutionContext): 
Future[Iterator[Event]] = {
+
     val (appId, channelId) = Common.appNameToId(appName, channelName)
 
-    Await.result(eventsDb.futureFind(
+    eventsDb.futureFind(
       appId = appId,
       channelId = channelId,
       startTime = startTime,
@@ -139,7 +259,7 @@ object LEventStore {
       eventNames = eventNames,
       targetEntityType = targetEntityType,
       targetEntityId = targetEntityId,
-      limit = limit), timeout)
+      limit = limit)
   }
 
 }
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
 
b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
index f4fd67627..6f39feb26 100644
--- 
a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
+++ 
b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
@@ -18,15 +18,35 @@
 
 package org.apache.predictionio.data.store.java
 
+import java.util.concurrent.{CompletableFuture, CompletionStage, 
ExecutorService}
+
 import org.apache.predictionio.data.storage.Event
 import org.apache.predictionio.data.store.LEventStore
 import org.joda.time.DateTime
 
 import scala.collection.JavaConversions
 import scala.concurrent.duration.Duration
+import scala.compat.java8.FutureConverters._
 
 /** This Java-friendly object provides a set of operation to access Event Store
-  * without going through Spark's parallelization
+  * without going through Spark's parallelization.
+  *
+  * Note that blocking methods of this object uses
+  * `scala.concurrent.ExecutionContext.Implicits.global` internally.
+  * Since this is a thread pool which has a number of threads equal to 
available
+  * processors, parallelism is limited up to the number of processors.
+  *
+  * If this limitation become bottleneck of resource usage, you can increase 
the
+  * number of threads by declaring following VM options before calling "pio 
deploy":
+  *
+  * <pre>
+  * export JAVA_OPTS="$JAVA_OPTS \
+  *   -Dscala.concurrent.context.numThreads=1000 \
+  *   -Dscala.concurrent.context.maxThreads=1000"
+  * </pre>
+  *
+  * You can learn more about the global execution context in the Scala 
documentation:
+  * 
[[https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context]]
   */
 object LJavaEventStore {
 
@@ -86,6 +106,61 @@ object LJavaEventStore {
       ).toSeq)
   }
 
+  /** Reads events of the specified entity. May use this in Algorithm's 
predict()
+    * or Serving logic to have fast event store access.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param channelName return events of this channel (default channel if 
it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param latest Return latest event first
+    * @return CompletableFuture[java.util.List[Event]]
+    */
+  def findByEntityAsync(
+    appName: String,
+    entityType: String,
+    entityId: String,
+    channelName: Option[String],
+    eventNames: Option[java.util.List[String]],
+    targetEntityType: Option[Option[String]],
+    targetEntityId: Option[Option[String]],
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    limit: Option[Integer],
+    latest: Boolean,
+    executorService: ExecutorService): 
CompletableFuture[java.util.List[Event]] = {
+
+    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+    val limitInt = limit.map(_.intValue())
+    implicit val ec = fromExecutorService(executorService)
+
+    LEventStore.findByEntityAsync(
+      appName,
+      entityType,
+      entityId,
+      channelName,
+      eventNamesSeq,
+      targetEntityType,
+      targetEntityId,
+      startTime,
+      untilTime,
+      limitInt,
+      latest
+    ).map { x => JavaConversions.seqAsJavaList(x.toSeq) 
}.toJava.toCompletableFuture
+  }
+
   /** Reads events generically. If entityType or entityId is not specified, it
     * results in table scan.
     *
@@ -142,4 +217,61 @@ object LJavaEventStore {
         timeout
       ).toSeq)
   }
+
+  /** Reads events generically. If entityType or entityId is not specified, it
+    * results in table scan.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    *   - None means no restriction on entityType
+    *   - Some(x) means entityType should match x.
+    * @param entityId return events of this entityId
+    *   - None means no restriction on entityId
+    *   - Some(x) means entityId should match x.
+    * @param channelName return events of this channel (default channel if 
it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @return CompletableFuture[java.util.List[Event]]
+    */
+  def findAsync(
+    appName: String,
+    entityType: Option[String],
+    entityId: Option[String],
+    channelName: Option[String],
+    eventNames: Option[java.util.List[String]],
+    targetEntityType: Option[Option[String]],
+    targetEntityId: Option[Option[String]],
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    limit: Option[Integer],
+    executorService: ExecutorService): 
CompletableFuture[java.util.List[Event]] = {
+
+    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+    val limitInt = limit.map(_.intValue())
+    implicit val ec = fromExecutorService(executorService)
+
+    LEventStore.findAsync(
+      appName,
+      entityType,
+      entityId,
+      channelName,
+      eventNamesSeq,
+      targetEntityType,
+      targetEntityId,
+      startTime,
+      untilTime,
+      limitInt
+    ).map { x => JavaConversions.seqAsJavaList(x.toSeq) 
}.toJava.toCompletableFuture
+  }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add asynchronous (non-blocking) methods to LEventStore
> ------------------------------------------------------
>
>                 Key: PIO-182
>                 URL: https://issues.apache.org/jira/browse/PIO-182
>             Project: PredictionIO
>          Issue Type: Improvement
>          Components: Core
>    Affects Versions: 0.13.0
>            Reporter: Naoki Takezoe
>            Assignee: Naoki Takezoe
>            Priority: Major
>
> The current implementation of {{LEventStore}} has only synchronous (blocking) 
> methods. Since they use {{ExecutionContext.Implicit.global}}, its parallelism 
> is limited up to the number of processors. This means engine server's 
> parallelism is also limited if we use these methods in prediction logic.
> To solve this problem, {{Future}} version of these methods should be added to 
> {{LEventStore}} and also current blocking methods should be modified to take 
> {{ExecutionContext}} (as an implicit parameter).
> See also: 
> https://lists.apache.org/thread.html/f14e4f8f29410e4585b3d8e9f646b88293a605f4716d3c4d60771854@%3Cuser.predictionio.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to